systemprompt_security/authz/
ingestion.rs1use std::sync::Arc;
16
17use sqlx::PgPool;
18use systemprompt_database::DbPool;
19use systemprompt_identifiers::RuleId;
20
21use super::config::{AccessControlConfig, RuleEntry};
22use super::error::{AuthzError, AuthzResult};
23use super::types::RuleType;
24
25const DEFAULT_SENTINEL_VALUE: &str = "__default__";
26
27#[derive(Debug, Clone, Copy, Default)]
28pub struct IngestOptions {
29 pub override_existing: bool,
30 pub delete_orphans: bool,
31}
32
33#[derive(Debug, Clone, Copy, Default)]
34pub struct IngestReport {
35 pub departments_declared: usize,
36 pub rules_inserted: usize,
37 pub rules_updated: usize,
38 pub rules_skipped: usize,
39 pub rules_deleted: usize,
40}
41
42#[derive(Debug, Clone)]
43pub struct AccessControlIngestionService {
44 write_pool: Arc<PgPool>,
45}
46
47impl AccessControlIngestionService {
48 pub fn new(db: &DbPool) -> AuthzResult<Self> {
49 let write_pool = db
50 .write_pool_arc()
51 .map_err(|err| AuthzError::Validation(err.to_string()))?;
52 Ok(Self { write_pool })
53 }
54
55 pub const fn from_pool(pool: Arc<PgPool>) -> Self {
56 Self { write_pool: pool }
57 }
58
59 pub async fn ingest_config(
60 &self,
61 cfg: &AccessControlConfig,
62 options: IngestOptions,
63 ) -> AuthzResult<IngestReport> {
64 cfg.validate()?;
65
66 let targets = expand_targets(&cfg.rules);
67
68 let mut tx = self.write_pool.begin().await?;
69 let mut report = IngestReport {
70 departments_declared: cfg.departments.len(),
71 ..IngestReport::default()
72 };
73
74 if options.delete_orphans {
75 let res = sqlx::query!(
76 r#"
77 DELETE FROM access_control_rules
78 WHERE rule_type IN ('role', 'department')
79 AND rule_value <> $1
80 "#,
81 DEFAULT_SENTINEL_VALUE,
82 )
83 .execute(&mut *tx)
84 .await?;
85 report.rules_deleted = res.rows_affected() as usize;
86 }
87
88 for target in &targets {
89 let outcome = upsert_target(&mut tx, target, options.override_existing).await?;
90 match outcome {
91 UpsertOutcome::Inserted => report.rules_inserted += 1,
92 UpsertOutcome::Updated => report.rules_updated += 1,
93 UpsertOutcome::Skipped => report.rules_skipped += 1,
94 }
95 }
96
97 tx.commit().await?;
98
99 tracing::info!(
100 target = "bootstrap_access_control_loaded",
101 departments_declared = report.departments_declared,
102 rules_inserted = report.rules_inserted,
103 rules_updated = report.rules_updated,
104 rules_skipped = report.rules_skipped,
105 rules_deleted = report.rules_deleted,
106 override_existing = options.override_existing,
107 delete_orphans = options.delete_orphans,
108 "access-control YAML ingested",
109 );
110
111 Ok(report)
112 }
113}
114
115#[derive(Debug)]
116struct Target<'a> {
117 entity_type: &'a str,
118 entity_id: &'a str,
119 rule_type: RuleType,
120 rule_value: &'a str,
121 access: &'static str,
122 justification: Option<&'a str>,
123}
124
125fn expand_targets(rules: &[RuleEntry]) -> Vec<Target<'_>> {
126 let mut out = Vec::with_capacity(rules.len());
127 for rule in rules {
128 let access_str = match rule.access {
129 super::types::Access::Allow => "allow",
130 super::types::Access::Deny => "deny",
131 };
132 for role in &rule.roles {
133 out.push(Target {
134 entity_type: rule.entity_type.as_str(),
135 entity_id: rule.entity_id.as_str(),
136 rule_type: RuleType::Role,
137 rule_value: role.as_str(),
138 access: access_str,
139 justification: rule.justification.as_deref(),
140 });
141 }
142 for dept in &rule.departments {
143 out.push(Target {
144 entity_type: rule.entity_type.as_str(),
145 entity_id: rule.entity_id.as_str(),
146 rule_type: RuleType::Department,
147 rule_value: dept.as_str(),
148 access: access_str,
149 justification: rule.justification.as_deref(),
150 });
151 }
152 }
153 out
154}
155
156#[derive(Debug, Clone, Copy)]
157enum UpsertOutcome {
158 Inserted,
159 Updated,
160 Skipped,
161}
162
163async fn upsert_target(
164 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
165 target: &Target<'_>,
166 override_existing: bool,
167) -> AuthzResult<UpsertOutcome> {
168 let existing = sqlx::query!(
169 r#"
170 SELECT id, access, justification
171 FROM access_control_rules
172 WHERE entity_type = $1 AND entity_id = $2
173 AND rule_type = $3 AND rule_value = $4
174 "#,
175 target.entity_type,
176 target.entity_id,
177 target.rule_type.to_string(),
178 target.rule_value,
179 )
180 .fetch_optional(&mut **tx)
181 .await?;
182
183 if let Some(row) = existing {
184 if !override_existing {
185 return Ok(UpsertOutcome::Skipped);
186 }
187 let unchanged =
188 row.access == target.access && row.justification.as_deref() == target.justification;
189 if unchanged {
190 return Ok(UpsertOutcome::Skipped);
191 }
192 sqlx::query!(
193 r#"
194 UPDATE access_control_rules
195 SET access = $2,
196 justification = $3,
197 updated_at = NOW()
198 WHERE id = $1
199 "#,
200 row.id,
201 target.access,
202 target.justification,
203 )
204 .execute(&mut **tx)
205 .await?;
206 Ok(UpsertOutcome::Updated)
207 } else {
208 let id = RuleId::generate();
209 sqlx::query!(
210 r#"
211 INSERT INTO access_control_rules
212 (id, entity_type, entity_id, rule_type, rule_value, access,
213 default_included, justification)
214 VALUES ($1, $2, $3, $4, $5, $6, false, $7)
215 "#,
216 id.as_str(),
217 target.entity_type,
218 target.entity_id,
219 target.rule_type.to_string(),
220 target.rule_value,
221 target.access,
222 target.justification,
223 )
224 .execute(&mut **tx)
225 .await?;
226 Ok(UpsertOutcome::Inserted)
227 }
228}