Skip to main content

systemprompt_sync/jobs/
access_control_sync.rs

1//! Bootstrap job that projects the access-control baseline into the database:
2//! the YAML grants at `access-control/config.yaml` plus the marketplace-access
3//! grants declared in the services config.
4//!
5//! Mirrors [`super::ContentSyncJob`] but with a fixed direction
6//! (config → DB). Disabled by default; operators wire it in via
7//! `scheduler_config.bootstrap_jobs` so it runs once at startup.
8
9use std::path::PathBuf;
10use std::sync::Arc;
11
12use async_trait::async_trait;
13use systemprompt_database::{Database, DbPool};
14use systemprompt_models::AppPaths;
15use systemprompt_security::authz::{AccessControlIngestionService, IngestOptions};
16use systemprompt_traits::{Job, JobContext, JobResult, ProviderError, ProviderResult};
17
18use crate::local::AccessControlLocalSync;
19
20const DEFAULT_YAML_RELATIVE: &str = "access-control/config.yaml";
21
22#[derive(Debug, Clone, Copy)]
23pub struct AccessControlSyncJob;
24
25#[async_trait]
26impl Job for AccessControlSyncJob {
27    fn name(&self) -> &'static str {
28        "access_control_sync"
29    }
30
31    fn description(&self) -> &'static str {
32        "Project services/access-control YAML into access_control_rules"
33    }
34
35    fn schedule(&self) -> &'static str {
36        ""
37    }
38
39    fn tags(&self) -> Vec<&'static str> {
40        vec!["access-control", "sync", "bootstrap"]
41    }
42
43    fn enabled(&self) -> bool {
44        false
45    }
46
47    async fn execute(&self, ctx: &JobContext) -> ProviderResult<JobResult> {
48        let start = std::time::Instant::now();
49
50        let db_pool: &DbPool = ctx.db_pool::<DbPool>().ok_or_else(|| {
51            ProviderError::Configuration("DbPool not available in job context".into())
52        })?;
53
54        let paths = ctx
55            .app_paths::<Arc<AppPaths>>()
56            .ok_or_else(|| {
57                ProviderError::Configuration("AppPaths not available in job context".into())
58            })?
59            .as_ref();
60
61        let yaml_path = resolve_yaml_path(ctx, paths.system().services());
62        let override_existing = bool_param(ctx, "override_existing", true);
63        let delete_orphans = bool_param(ctx, "delete_orphans", true);
64
65        tracing::info!(
66            yaml_path = %yaml_path.display(),
67            override_existing,
68            delete_orphans,
69            "access_control_sync job started",
70        );
71
72        let sync = AccessControlLocalSync::new(Arc::<Database>::clone(db_pool), yaml_path);
73        let acl = sync
74            .sync_to_db(override_existing, delete_orphans)
75            .await
76            .map_err(|e| ProviderError::RenderFailed(e.to_string()))?;
77
78        let services = systemprompt_loader::ConfigLoader::load().map_err(|e| {
79            ProviderError::Configuration(format!("Failed to load services config: {e}"))
80        })?;
81        let svc = AccessControlIngestionService::new(db_pool)
82            .map_err(|e| ProviderError::Configuration(e.to_string()))?;
83        let mkt = svc
84            .ingest_marketplace_access(
85                &services.marketplaces,
86                IngestOptions {
87                    override_existing,
88                    delete_orphans,
89                },
90            )
91            .await
92            .map_err(|e| ProviderError::RenderFailed(e.to_string()))?;
93
94        let items_synced = acl.items_synced + mkt.inserted + mkt.updated;
95        let items_skipped = acl.items_skipped + mkt.skipped;
96        let items_deleted = acl.items_deleted + mkt.deleted;
97
98        let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
99        tracing::info!(
100            items_synced,
101            items_skipped,
102            items_deleted,
103            duration_ms,
104            "access_control_sync job completed",
105        );
106
107        Ok(JobResult::success()
108            .with_stats(items_synced as u64, acl.errors.len() as u64)
109            .with_duration(duration_ms))
110    }
111}
112
113fn resolve_yaml_path(ctx: &JobContext, services_path: &std::path::Path) -> PathBuf {
114    ctx.parameters().get("yaml_path").map_or_else(
115        || services_path.join(DEFAULT_YAML_RELATIVE),
116        |raw| {
117            let p = std::path::Path::new(raw);
118            if p.is_absolute() {
119                p.to_path_buf()
120            } else {
121                services_path.join(p)
122            }
123        },
124    )
125}
126
127fn bool_param(ctx: &JobContext, key: &str, default: bool) -> bool {
128    ctx.parameters().get(key).map_or(default, |v| {
129        matches!(v.as_str(), "true" | "1" | "yes" | "TRUE" | "True")
130    })
131}
132
133systemprompt_provider_contracts::submit_job!(&AccessControlSyncJob);