systemprompt_sync/jobs/
access_control_sync.rs1use std::path::PathBuf;
10use std::sync::Arc;
11
12use async_trait::async_trait;
13use systemprompt_database::{Database, DbPool};
14use systemprompt_models::AppPaths;
15use systemprompt_security::authz::{AccessControlIngestionService, IngestOptions};
16use systemprompt_traits::{Job, JobContext, JobResult, ProviderError, ProviderResult};
17
18use crate::local::AccessControlLocalSync;
19
20const DEFAULT_YAML_RELATIVE: &str = "access-control/config.yaml";
21
22#[derive(Debug, Clone, Copy)]
23pub struct AccessControlSyncJob;
24
25#[async_trait]
26impl Job for AccessControlSyncJob {
27 fn name(&self) -> &'static str {
28 "access_control_sync"
29 }
30
31 fn description(&self) -> &'static str {
32 "Project services/access-control YAML into access_control_rules"
33 }
34
35 fn schedule(&self) -> &'static str {
36 ""
37 }
38
39 fn tags(&self) -> Vec<&'static str> {
40 vec!["access-control", "sync", "bootstrap"]
41 }
42
43 fn enabled(&self) -> bool {
44 false
45 }
46
47 async fn execute(&self, ctx: &JobContext) -> ProviderResult<JobResult> {
48 let start = std::time::Instant::now();
49
50 let db_pool: &DbPool = ctx.db_pool::<DbPool>().ok_or_else(|| {
51 ProviderError::Configuration("DbPool not available in job context".into())
52 })?;
53
54 let paths = ctx
55 .app_paths::<Arc<AppPaths>>()
56 .ok_or_else(|| {
57 ProviderError::Configuration("AppPaths not available in job context".into())
58 })?
59 .as_ref();
60
61 let yaml_path = resolve_yaml_path(ctx, paths.system().services());
62 let override_existing = bool_param(ctx, "override_existing", true);
63 let delete_orphans = bool_param(ctx, "delete_orphans", true);
64
65 tracing::info!(
66 yaml_path = %yaml_path.display(),
67 override_existing,
68 delete_orphans,
69 "access_control_sync job started",
70 );
71
72 let sync = AccessControlLocalSync::new(Arc::<Database>::clone(db_pool), yaml_path);
73 let acl = sync
74 .sync_to_db(override_existing, delete_orphans)
75 .await
76 .map_err(|e| ProviderError::RenderFailed(e.to_string()))?;
77
78 let services = systemprompt_loader::ConfigLoader::load().map_err(|e| {
79 ProviderError::Configuration(format!("Failed to load services config: {e}"))
80 })?;
81 let svc = AccessControlIngestionService::new(db_pool)
82 .map_err(|e| ProviderError::Configuration(e.to_string()))?;
83 let mkt = svc
84 .ingest_marketplace_access(
85 &services.marketplaces,
86 IngestOptions {
87 override_existing,
88 delete_orphans,
89 },
90 )
91 .await
92 .map_err(|e| ProviderError::RenderFailed(e.to_string()))?;
93
94 let items_synced = acl.items_synced + mkt.inserted + mkt.updated;
95 let items_skipped = acl.items_skipped + mkt.skipped;
96 let items_deleted = acl.items_deleted + mkt.deleted;
97
98 let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
99 tracing::info!(
100 items_synced,
101 items_skipped,
102 items_deleted,
103 duration_ms,
104 "access_control_sync job completed",
105 );
106
107 Ok(JobResult::success()
108 .with_stats(items_synced as u64, acl.errors.len() as u64)
109 .with_duration(duration_ms))
110 }
111}
112
113fn resolve_yaml_path(ctx: &JobContext, services_path: &std::path::Path) -> PathBuf {
114 ctx.parameters().get("yaml_path").map_or_else(
115 || services_path.join(DEFAULT_YAML_RELATIVE),
116 |raw| {
117 let p = std::path::Path::new(raw);
118 if p.is_absolute() {
119 p.to_path_buf()
120 } else {
121 services_path.join(p)
122 }
123 },
124 )
125}
126
127fn bool_param(ctx: &JobContext, key: &str, default: bool) -> bool {
128 ctx.parameters().get(key).map_or(default, |v| {
129 matches!(v.as_str(), "true" | "1" | "yes" | "TRUE" | "True")
130 })
131}
132
133systemprompt_provider_contracts::submit_job!(&AccessControlSyncJob);