use std::collections::HashSet;
use reifydb_core::interface::catalog::{
change::{
CatalogTrackGrantedRoleChangeOperations, CatalogTrackIdentityChangeOperations,
CatalogTrackRoleChangeOperations,
},
identity::{GrantedRole, Identity, Role, RoleId},
};
use reifydb_runtime::context::{clock::Clock, rng::Rng};
use reifydb_transaction::{
change::{TransactionalGrantedRoleChanges, TransactionalIdentityChanges, TransactionalRoleChanges},
transaction::{Transaction, admin::AdminTransaction},
};
use reifydb_type::{fragment::Fragment, value::identity::IdentityId};
use tracing::{instrument, warn};
use crate::{
CatalogStore, Result,
catalog::Catalog,
error::{CatalogError, CatalogObjectKind},
};
impl Catalog {
#[instrument(name = "catalog::identity::find_by_name", level = "trace", skip(self, txn))]
pub fn find_identity_by_name(&self, txn: &mut Transaction<'_>, name: &str) -> Result<Option<Identity>> {
match txn.reborrow() {
Transaction::Admin(admin) => {
if let Some(ident) = TransactionalIdentityChanges::find_identity_by_name(admin, name) {
return Ok(Some(ident.clone()));
}
if TransactionalIdentityChanges::is_identity_deleted_by_name(admin, name) {
return Ok(None);
}
if let Some(ident) = self.materialized.find_identity_by_name_at(name, admin.version()) {
return Ok(Some(ident));
}
if let Some(ident) =
CatalogStore::find_identity_by_name(&mut Transaction::Admin(&mut *admin), name)?
{
warn!("Identity '{}' found in storage but not in MaterializedCatalog", name);
return Ok(Some(ident));
}
Ok(None)
}
Transaction::Command(cmd) => {
if let Some(ident) = self.materialized.find_identity_by_name_at(name, cmd.version()) {
return Ok(Some(ident));
}
if let Some(ident) =
CatalogStore::find_identity_by_name(&mut Transaction::Command(&mut *cmd), name)?
{
warn!("Identity '{}' found in storage but not in MaterializedCatalog", name);
return Ok(Some(ident));
}
Ok(None)
}
Transaction::Query(qry) => {
if let Some(ident) = self.materialized.find_identity_by_name_at(name, qry.version()) {
return Ok(Some(ident));
}
if let Some(ident) =
CatalogStore::find_identity_by_name(&mut Transaction::Query(&mut *qry), name)?
{
warn!("Identity '{}' found in storage but not in MaterializedCatalog", name);
return Ok(Some(ident));
}
Ok(None)
}
Transaction::Test(t) => {
if let Some(ident) = TransactionalIdentityChanges::find_identity_by_name(t.inner, name)
{
return Ok(Some(ident.clone()));
}
if TransactionalIdentityChanges::is_identity_deleted_by_name(t.inner, name) {
return Ok(None);
}
if let Some(ident) = self.materialized.find_identity_by_name_at(name, t.inner.version())
{
return Ok(Some(ident));
}
if let Some(ident) = CatalogStore::find_identity_by_name(
&mut Transaction::Admin(&mut *t.inner),
name,
)? {
warn!("Identity '{}' found in storage but not in MaterializedCatalog", name);
return Ok(Some(ident));
}
Ok(None)
}
Transaction::Replica(rep) => {
if let Some(ident) = self.materialized.find_identity_by_name_at(name, rep.version()) {
return Ok(Some(ident));
}
if let Some(ident) =
CatalogStore::find_identity_by_name(&mut Transaction::Replica(&mut *rep), name)?
{
warn!("Identity '{}' found in storage but not in MaterializedCatalog", name);
return Ok(Some(ident));
}
Ok(None)
}
}
}
#[instrument(name = "catalog::identity::find", level = "trace", skip(self, txn))]
pub fn find_identity(&self, txn: &mut Transaction<'_>, identity: IdentityId) -> Result<Option<Identity>> {
match txn.reborrow() {
Transaction::Admin(admin) => {
if let Some(ident) = TransactionalIdentityChanges::find_identity(admin, identity) {
return Ok(Some(ident.clone()));
}
if TransactionalIdentityChanges::is_identity_deleted(admin, identity) {
return Ok(None);
}
if let Some(ident) = self.materialized.find_identity_at(identity, admin.version()) {
return Ok(Some(ident));
}
if let Some(ident) =
CatalogStore::find_identity(&mut Transaction::Admin(&mut *admin), identity)?
{
warn!(
"Identity '{}' found in storage but not in MaterializedCatalog",
identity
);
return Ok(Some(ident));
}
Ok(None)
}
Transaction::Command(cmd) => {
if let Some(ident) = self.materialized.find_identity_at(identity, cmd.version()) {
return Ok(Some(ident));
}
if let Some(ident) =
CatalogStore::find_identity(&mut Transaction::Command(&mut *cmd), identity)?
{
warn!(
"Identity '{}' found in storage but not in MaterializedCatalog",
identity
);
return Ok(Some(ident));
}
Ok(None)
}
Transaction::Query(qry) => {
if let Some(ident) = self.materialized.find_identity_at(identity, qry.version()) {
return Ok(Some(ident));
}
if let Some(ident) =
CatalogStore::find_identity(&mut Transaction::Query(&mut *qry), identity)?
{
warn!(
"Identity '{}' found in storage but not in MaterializedCatalog",
identity
);
return Ok(Some(ident));
}
Ok(None)
}
Transaction::Test(t) => {
if let Some(ident) = TransactionalIdentityChanges::find_identity(t.inner, identity) {
return Ok(Some(ident.clone()));
}
if TransactionalIdentityChanges::is_identity_deleted(t.inner, identity) {
return Ok(None);
}
if let Some(ident) = self.materialized.find_identity_at(identity, t.inner.version()) {
return Ok(Some(ident));
}
if let Some(ident) =
CatalogStore::find_identity(&mut Transaction::Admin(&mut *t.inner), identity)?
{
warn!(
"Identity '{}' found in storage but not in MaterializedCatalog",
identity
);
return Ok(Some(ident));
}
Ok(None)
}
Transaction::Replica(rep) => {
if let Some(ident) = self.materialized.find_identity_at(identity, rep.version()) {
return Ok(Some(ident));
}
if let Some(ident) =
CatalogStore::find_identity(&mut Transaction::Replica(&mut *rep), identity)?
{
warn!(
"Identity '{}' found in storage but not in MaterializedCatalog",
identity
);
return Ok(Some(ident));
}
Ok(None)
}
}
}
#[instrument(name = "catalog::identity::create", level = "debug", skip(self, txn, clock, rng))]
pub fn create_identity(
&self,
txn: &mut AdminTransaction,
name: &str,
clock: &Clock,
rng: &Rng,
) -> Result<Identity> {
let ident = CatalogStore::create_identity(txn, name, clock, rng)?;
txn.track_identity_created(ident.clone())?;
Ok(ident)
}
#[instrument(name = "catalog::identity::drop", level = "debug", skip(self, txn))]
pub fn drop_identity(&self, txn: &mut AdminTransaction, identity: IdentityId) -> Result<()> {
if let Some(ident) = CatalogStore::find_identity(&mut Transaction::Admin(&mut *txn), identity)? {
CatalogStore::drop_identity(txn, identity)?;
txn.track_identity_deleted(ident)?;
} else {
CatalogStore::drop_identity(txn, identity)?;
}
Ok(())
}
#[instrument(name = "catalog::identity::list_all", level = "debug", skip(self, txn))]
pub fn list_identities_all(&self, txn: &mut Transaction<'_>) -> Result<Vec<Identity>> {
match txn.reborrow() {
Transaction::Command(cmd) => Ok(self.materialized.list_all_identities_at(cmd.version())),
Transaction::Admin(admin) => {
let mut idents = self.materialized.list_all_identities_at(admin.version());
for change in &admin.changes.identity {
if let Some(ident) = &change.post
&& !idents.iter().any(|existing| existing.id == ident.id)
{
idents.push(ident.clone());
}
}
idents.retain(|i| !admin.is_identity_deleted(i.id));
Ok(idents)
}
Transaction::Query(qry) => Ok(self.materialized.list_all_identities_at(qry.version())),
Transaction::Test(t) => {
let mut idents = self.materialized.list_all_identities_at(t.inner.version());
for change in &t.inner.changes.identity {
if let Some(ident) = &change.post
&& !idents.iter().any(|existing| existing.id == ident.id)
{
idents.push(ident.clone());
}
}
idents.retain(|i| !t.inner.is_identity_deleted(i.id));
Ok(idents)
}
Transaction::Replica(rep) => Ok(self.materialized.list_all_identities_at(rep.version())),
}
}
#[instrument(name = "catalog::role::find_by_name", level = "trace", skip(self, txn))]
pub fn find_role_by_name(&self, txn: &mut Transaction<'_>, name: &str) -> Result<Option<Role>> {
match txn.reborrow() {
Transaction::Admin(admin) => {
if let Some(role) = TransactionalRoleChanges::find_role_by_name(admin, name) {
return Ok(Some(role.clone()));
}
if TransactionalRoleChanges::is_role_deleted_by_name(admin, name) {
return Ok(None);
}
if let Some(role) = self.materialized.find_role_by_name_at(name, admin.version()) {
return Ok(Some(role));
}
if let Some(role) =
CatalogStore::find_role_by_name(&mut Transaction::Admin(&mut *admin), name)?
{
warn!("Role '{}' found in storage but not in MaterializedCatalog", name);
return Ok(Some(role));
}
Ok(None)
}
Transaction::Command(cmd) => {
if let Some(role) = self.materialized.find_role_by_name_at(name, cmd.version()) {
return Ok(Some(role));
}
if let Some(role) =
CatalogStore::find_role_by_name(&mut Transaction::Command(&mut *cmd), name)?
{
warn!("Role '{}' found in storage but not in MaterializedCatalog", name);
return Ok(Some(role));
}
Ok(None)
}
Transaction::Query(qry) => {
if let Some(role) = self.materialized.find_role_by_name_at(name, qry.version()) {
return Ok(Some(role));
}
if let Some(role) =
CatalogStore::find_role_by_name(&mut Transaction::Query(&mut *qry), name)?
{
warn!("Role '{}' found in storage but not in MaterializedCatalog", name);
return Ok(Some(role));
}
Ok(None)
}
Transaction::Test(t) => {
if let Some(role) = TransactionalRoleChanges::find_role_by_name(t.inner, name) {
return Ok(Some(role.clone()));
}
if TransactionalRoleChanges::is_role_deleted_by_name(t.inner, name) {
return Ok(None);
}
if let Some(role) = self.materialized.find_role_by_name_at(name, t.inner.version()) {
return Ok(Some(role));
}
if let Some(role) =
CatalogStore::find_role_by_name(&mut Transaction::Admin(&mut *t.inner), name)?
{
warn!("Role '{}' found in storage but not in MaterializedCatalog", name);
return Ok(Some(role));
}
Ok(None)
}
Transaction::Replica(rep) => {
if let Some(role) = self.materialized.find_role_by_name_at(name, rep.version()) {
return Ok(Some(role));
}
if let Some(role) =
CatalogStore::find_role_by_name(&mut Transaction::Replica(&mut *rep), name)?
{
warn!("Role '{}' found in storage but not in MaterializedCatalog", name);
return Ok(Some(role));
}
Ok(None)
}
}
}
#[instrument(name = "catalog::role::find", level = "trace", skip(self, txn))]
pub fn find_role(&self, txn: &mut Transaction<'_>, role_id: RoleId) -> Result<Option<Role>> {
match txn.reborrow() {
Transaction::Admin(admin) => {
if let Some(role) = TransactionalRoleChanges::find_role(admin, role_id) {
return Ok(Some(role.clone()));
}
if TransactionalRoleChanges::is_role_deleted(admin, role_id) {
return Ok(None);
}
if let Some(role) = self.materialized.find_role_at(role_id, admin.version()) {
return Ok(Some(role));
}
if let Some(role) =
CatalogStore::find_role(&mut Transaction::Admin(&mut *admin), role_id)?
{
warn!("Role '{}' found in storage but not in MaterializedCatalog", role_id);
return Ok(Some(role));
}
Ok(None)
}
Transaction::Command(cmd) => {
if let Some(role) = self.materialized.find_role_at(role_id, cmd.version()) {
return Ok(Some(role));
}
if let Some(role) =
CatalogStore::find_role(&mut Transaction::Command(&mut *cmd), role_id)?
{
warn!("Role '{}' found in storage but not in MaterializedCatalog", role_id);
return Ok(Some(role));
}
Ok(None)
}
Transaction::Query(qry) => {
if let Some(role) = self.materialized.find_role_at(role_id, qry.version()) {
return Ok(Some(role));
}
if let Some(role) =
CatalogStore::find_role(&mut Transaction::Query(&mut *qry), role_id)?
{
warn!("Role '{}' found in storage but not in MaterializedCatalog", role_id);
return Ok(Some(role));
}
Ok(None)
}
Transaction::Test(t) => {
if let Some(role) = TransactionalRoleChanges::find_role(t.inner, role_id) {
return Ok(Some(role.clone()));
}
if TransactionalRoleChanges::is_role_deleted(t.inner, role_id) {
return Ok(None);
}
if let Some(role) = self.materialized.find_role_at(role_id, t.inner.version()) {
return Ok(Some(role));
}
if let Some(role) =
CatalogStore::find_role(&mut Transaction::Admin(&mut *t.inner), role_id)?
{
warn!("Role '{}' found in storage but not in MaterializedCatalog", role_id);
return Ok(Some(role));
}
Ok(None)
}
Transaction::Replica(rep) => {
if let Some(role) = self.materialized.find_role_at(role_id, rep.version()) {
return Ok(Some(role));
}
if let Some(role) =
CatalogStore::find_role(&mut Transaction::Replica(&mut *rep), role_id)?
{
warn!("Role '{}' found in storage but not in MaterializedCatalog", role_id);
return Ok(Some(role));
}
Ok(None)
}
}
}
#[instrument(name = "catalog::role::create", level = "debug", skip(self, txn))]
pub fn create_role(&self, txn: &mut AdminTransaction, name: &str) -> Result<Role> {
let role = CatalogStore::create_role(txn, name)?;
txn.track_role_created(role.clone())?;
Ok(role)
}
#[instrument(name = "catalog::role::drop", level = "debug", skip(self, txn))]
pub fn drop_role(&self, txn: &mut AdminTransaction, role_id: RoleId) -> Result<()> {
if let Some(role) = CatalogStore::find_role(&mut Transaction::Admin(&mut *txn), role_id)? {
CatalogStore::drop_role(txn, role_id)?;
txn.track_role_deleted(role)?;
} else {
CatalogStore::drop_role(txn, role_id)?;
}
Ok(())
}
#[instrument(name = "catalog::identity::grant_role", level = "debug", skip(self, txn))]
pub fn grant_role(
&self,
txn: &mut AdminTransaction,
identity: IdentityId,
role_id: RoleId,
) -> Result<GrantedRole> {
let ir = CatalogStore::grant_role(txn, identity, role_id)?;
txn.track_granted_role_created(ir.clone())?;
Ok(ir)
}
#[instrument(name = "catalog::identity::revoke_role", level = "debug", skip(self, txn))]
pub fn revoke_role(&self, txn: &mut AdminTransaction, identity: IdentityId, role_id: RoleId) -> Result<()> {
let ir = GrantedRole {
identity,
role_id,
};
CatalogStore::revoke_role(txn, identity, role_id)?;
txn.track_granted_role_deleted(ir)?;
Ok(())
}
#[instrument(name = "catalog::identity::find_role_names_for_identity", level = "trace", skip(self, txn))]
pub fn find_role_names_for_identity(
&self,
txn: &mut Transaction<'_>,
identity: IdentityId,
) -> Result<Vec<String>> {
match txn.reborrow() {
Transaction::Admin(admin) => {
let version = admin.version();
let mut names = Vec::new();
let mut seen_roles = HashSet::new();
for ir in TransactionalGrantedRoleChanges::find_granted_roles_for_identity(
admin, identity,
) {
if !TransactionalRoleChanges::is_role_deleted(admin, ir.role_id) {
if let Some(role) =
TransactionalRoleChanges::find_role(admin, ir.role_id)
{
seen_roles.insert(ir.role_id);
names.push(role.name.clone());
} else if let Some(role) =
self.materialized.find_role_at(ir.role_id, version)
{
seen_roles.insert(ir.role_id);
names.push(role.name);
}
}
}
for ir in self.materialized.find_granted_roles_at(identity, version) {
if !seen_roles.contains(&ir.role_id)
&& !TransactionalGrantedRoleChanges::is_granted_role_deleted(
admin, identity, ir.role_id,
) && let Some(role) = self.materialized.find_role_at(ir.role_id, version)
{
names.push(role.name);
}
}
Ok(names)
}
Transaction::Test(t) => {
let version = t.inner.version();
let mut names = Vec::new();
let mut seen_roles = HashSet::new();
for ir in TransactionalGrantedRoleChanges::find_granted_roles_for_identity(
t.inner, identity,
) {
if !TransactionalRoleChanges::is_role_deleted(t.inner, ir.role_id) {
if let Some(role) =
TransactionalRoleChanges::find_role(t.inner, ir.role_id)
{
seen_roles.insert(ir.role_id);
names.push(role.name.clone());
} else if let Some(role) =
self.materialized.find_role_at(ir.role_id, version)
{
seen_roles.insert(ir.role_id);
names.push(role.name);
}
}
}
for ir in self.materialized.find_granted_roles_at(identity, version) {
if !seen_roles.contains(&ir.role_id)
&& !TransactionalGrantedRoleChanges::is_granted_role_deleted(
t.inner, identity, ir.role_id,
) && let Some(role) = self.materialized.find_role_at(ir.role_id, version)
{
names.push(role.name);
}
}
Ok(names)
}
_ => {
let version = match txn.reborrow() {
Transaction::Command(cmd) => cmd.version(),
Transaction::Query(qry) => qry.version(),
Transaction::Replica(rep) => rep.version(),
_ => unreachable!(),
};
let granted_roles = self.materialized.find_granted_roles_at(identity, version);
let mut names = Vec::with_capacity(granted_roles.len());
for ir in granted_roles {
if let Some(role) = self.materialized.find_role_at(ir.role_id, version) {
names.push(role.name);
}
}
Ok(names)
}
}
}
pub fn get_identity_by_name(&self, txn: &mut Transaction<'_>, name: &str) -> Result<Identity> {
self.find_identity_by_name(txn, name)?.ok_or_else(|| {
CatalogError::NotFound {
kind: CatalogObjectKind::Identity,
namespace: "system".to_string(),
name: name.to_string(),
fragment: Fragment::None,
}
.into()
})
}
pub fn get_role_by_name(&self, txn: &mut Transaction<'_>, name: &str) -> Result<Role> {
self.find_role_by_name(txn, name)?.ok_or_else(|| {
CatalogError::NotFound {
kind: CatalogObjectKind::Role,
namespace: "system".to_string(),
name: name.to_string(),
fragment: Fragment::None,
}
.into()
})
}
}