mod upsert;
use std::collections::HashMap;
use std::sync::Arc;
use sqlx::PgPool;
use systemprompt_database::DbPool;
use systemprompt_identifiers::MarketplaceId;
use systemprompt_models::services::MarketplaceConfig;
use super::config::AccessControlConfig;
use super::error::{AuthzError, AuthzResult};
use super::types::{EntityKind, RuleType};
use upsert::{
Target, UpsertOutcome, expand_targets, upsert_entity_row, upsert_marketplace_entity_row,
upsert_target,
};
#[derive(Debug, Clone, Copy, Default)]
pub struct IngestOptions {
pub override_existing: bool,
pub delete_orphans: bool,
}
#[derive(Debug, Clone, Copy, Default)]
pub struct IngestReport {
pub inserted: usize,
pub updated: usize,
pub skipped: usize,
pub deleted: usize,
}
#[derive(Debug, Clone)]
pub struct AccessControlIngestionService {
write_pool: Arc<PgPool>,
}
impl AccessControlIngestionService {
pub fn new(db: &DbPool) -> AuthzResult<Self> {
let write_pool = db
.write_pool_arc()
.map_err(|err| AuthzError::Validation(err.to_string()))?;
Ok(Self { write_pool })
}
pub const fn from_pool(pool: Arc<PgPool>) -> Self {
Self { write_pool: pool }
}
pub async fn ingest_config(
&self,
cfg: &AccessControlConfig,
options: IngestOptions,
) -> AuthzResult<IngestReport> {
cfg.validate()?;
let targets = expand_targets(&cfg.rules);
let mut tx = self.write_pool.begin().await?;
let mut report = IngestReport::default();
if options.delete_orphans {
let entity_types: Vec<String> = targets
.iter()
.map(|t| t.entity_kind.as_str().to_owned())
.collect();
let entity_ids: Vec<String> = targets.iter().map(|t| t.entity_id.to_owned()).collect();
let res = sqlx::query!(
r#"
DELETE FROM access_control_rules
WHERE rule_type = 'role'
AND (entity_type, entity_id) IN (
SELECT * FROM UNNEST($1::text[], $2::text[])
)
"#,
&entity_types,
&entity_ids,
)
.execute(&mut *tx)
.await?;
report.deleted = res.rows_affected() as usize;
}
for target in &targets {
upsert_entity_row(&mut tx, target).await?;
let outcome = upsert_target(&mut tx, target, options.override_existing).await?;
match outcome {
UpsertOutcome::Inserted => report.inserted += 1,
UpsertOutcome::Updated => report.updated += 1,
UpsertOutcome::Skipped => report.skipped += 1,
}
}
tx.commit().await?;
tracing::info!(
target = "bootstrap_access_control_loaded",
inserted = report.inserted,
updated = report.updated,
skipped = report.skipped,
deleted = report.deleted,
override_existing = options.override_existing,
delete_orphans = options.delete_orphans,
"access-control YAML ingested",
);
Ok(report)
}
pub async fn ingest_marketplace_access(
&self,
marketplaces: &HashMap<MarketplaceId, MarketplaceConfig>,
options: IngestOptions,
) -> AuthzResult<IngestReport> {
let mut tx = self.write_pool.begin().await?;
let mut report = IngestReport::default();
let mut ingested_ids: Vec<String> = Vec::new();
for (id, cfg) in marketplaces {
if cfg.access.roles.is_empty() {
continue;
}
ingested_ids.push(id.as_str().to_owned());
}
if options.delete_orphans && !ingested_ids.is_empty() {
let res = sqlx::query!(
r#"
DELETE FROM access_control_rules
WHERE rule_type = 'role'
AND entity_type = 'marketplace'
AND entity_id = ANY($1::text[])
"#,
&ingested_ids,
)
.execute(&mut *tx)
.await?;
report.deleted = res.rows_affected() as usize;
}
for (id, cfg) in marketplaces {
if cfg.access.roles.is_empty() {
continue;
}
let entity_id = id.as_str();
upsert_marketplace_entity_row(&mut tx, entity_id, cfg.access.default_included).await?;
for role in &cfg.access.roles {
let target = Target {
entity_kind: EntityKind::Marketplace,
entity_id,
rule_type: RuleType::Role,
rule_value: role.as_str(),
access: "allow",
justification: cfg.access.justification.as_deref(),
};
let outcome = upsert_target(&mut tx, &target, options.override_existing).await?;
match outcome {
UpsertOutcome::Inserted => report.inserted += 1,
UpsertOutcome::Updated => report.updated += 1,
UpsertOutcome::Skipped => report.skipped += 1,
}
}
}
tx.commit().await?;
tracing::info!(
target = "bootstrap_marketplace_access_loaded",
inserted = report.inserted,
updated = report.updated,
skipped = report.skipped,
deleted = report.deleted,
override_existing = options.override_existing,
delete_orphans = options.delete_orphans,
"marketplace access blocks ingested",
);
Ok(report)
}
}