gsm_subscriptions_teams/
lib.rs

1use std::collections::HashMap;
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::process::Command;
5use std::time::Duration;
6
7use anyhow::{Context, Result};
8use greentic_config::ConfigResolver;
9use greentic_config_types::GreenticConfig;
10use greentic_pack::reader::SigningPolicy;
11use greentic_types::{EnvId, PackId};
12use gsm_core::{
13    InMemoryProviderInstallStore, ProviderExtensionsRegistry, ProviderInstallState,
14    ProviderInstallStore, ProviderInstallStoreSnapshot, load_install_store_from_path,
15    load_provider_extensions_from_pack_files,
16};
17use serde_json::{Value, json};
18use time::OffsetDateTime;
19use tracing::{info, warn};
20
21#[derive(Debug, Clone)]
22pub struct WorkerConfig {
23    pub env: EnvId,
24    pub packs_root: PathBuf,
25    pub install_store_path: Option<PathBuf>,
26    pub sync_interval: Duration,
27    pub provision_bin: String,
28    pub dry_run: bool,
29}
30
31impl WorkerConfig {
32    pub fn load() -> Result<Self> {
33        let resolved = ConfigResolver::new().load()?;
34        Ok(Self::from_config(&resolved.config))
35    }
36
37    pub fn from_config(config: &GreenticConfig) -> Self {
38        let packs_root = config.paths.greentic_root.join("packs");
39        let install_store_path = default_install_store_path(config);
40        Self {
41            env: config.environment.env_id.clone(),
42            packs_root,
43            install_store_path,
44            sync_interval: Duration::from_secs(15 * 60),
45            provision_bin: "greentic-provision".to_string(),
46            dry_run: false,
47        }
48    }
49}
50
51pub async fn run_worker(config: WorkerConfig) -> Result<()> {
52    let pack_paths = discover_pack_files(&config.packs_root)?;
53    let extensions = load_provider_extensions_from_pack_files(&config.packs_root, &pack_paths)?;
54    let pack_index = build_pack_index(&pack_paths)?;
55    let store = load_store(&config)?;
56    let mut failures = HashMap::new();
57    let mut interval = tokio::time::interval(config.sync_interval);
58
59    info!(
60        env = %config.env.as_str(),
61        packs = %pack_paths.len(),
62        "subscriptions worker started"
63    );
64
65    loop {
66        interval.tick().await;
67        let updated =
68            run_sync_cycle(&store, &extensions, &pack_index, &config, &mut failures).await?;
69        if updated > 0 {
70            info!(updated, "subscriptions state updated");
71        }
72        persist_store(&config, &store)?;
73    }
74}
75
76fn load_store(config: &WorkerConfig) -> Result<InMemoryProviderInstallStore> {
77    if let Some(path) = config.install_store_path.as_ref() {
78        match load_install_store_from_path(path) {
79            Ok(store) => Ok(store),
80            Err(err) => {
81                warn!(error = %err, path = %path.display(), "failed to load install records");
82                Ok(InMemoryProviderInstallStore::default())
83            }
84        }
85    } else {
86        Ok(InMemoryProviderInstallStore::default())
87    }
88}
89
90fn persist_store(config: &WorkerConfig, store: &InMemoryProviderInstallStore) -> Result<()> {
91    let Some(path) = config.install_store_path.as_ref() else {
92        return Ok(());
93    };
94    let snapshot = ProviderInstallStoreSnapshot {
95        records: Vec::new(),
96        states: store.all(),
97    };
98    let payload = serde_json::to_string_pretty(&snapshot).context("serialize install store")?;
99    if let Some(parent) = path.parent() {
100        fs::create_dir_all(parent).with_context(|| format!("create {}", parent.display()))?;
101    }
102    fs::write(path, payload).with_context(|| format!("write {}", path.display()))?;
103    Ok(())
104}
105
106pub async fn run_sync_cycle(
107    store: &InMemoryProviderInstallStore,
108    extensions: &ProviderExtensionsRegistry,
109    pack_index: &HashMap<PackId, PathBuf>,
110    config: &WorkerConfig,
111    failures: &mut HashMap<InstallKey, FailureState>,
112) -> Result<usize> {
113    let mut updated = 0;
114    let installs = store.all();
115    for state in installs {
116        if !extensions
117            .subscriptions
118            .contains_key(&state.record.provider_id)
119        {
120            continue;
121        }
122        let Some(pack_path) = pack_index.get(&state.record.pack_id) else {
123            warn!(
124                provider_id = %state.record.provider_id,
125                pack_id = %state.record.pack_id,
126                "missing pack for subscriptions sync"
127            );
128            continue;
129        };
130        let key = InstallKey::from_state(&state);
131        if let Some(failure) = failures.get(&key)
132            && failure.next_attempt > std::time::Instant::now()
133            && !config.dry_run
134        {
135            continue;
136        }
137
138        let sync_result = run_provision_sync(state.clone(), pack_path, config).await;
139        match sync_result {
140            Ok(plan) => {
141                let mut state = state;
142                let now = OffsetDateTime::now_utc();
143                state.record.updated_at = now;
144                state.record.subscriptions_state = json!({
145                    "last_sync": now.format(&time::format_description::well_known::Rfc3339)
146                        .unwrap_or_else(|_| now.unix_timestamp().to_string()),
147                    "plan": plan,
148                });
149                clear_subscription_error(&mut state);
150                store.insert(state);
151                failures.remove(&key);
152                updated += 1;
153            }
154            Err(err) => {
155                let failure = failures.entry(key.clone()).or_default();
156                failure.count += 1;
157                failure.bump_backoff();
158                let mut state = state;
159                mark_subscription_error(&mut state, failure.count, err.to_string());
160                store.insert(state);
161                warn!(
162                    provider_id = %key.provider_id,
163                    install_id = %key.install_id,
164                    error = %err,
165                    "subscriptions sync failed"
166                );
167            }
168        }
169    }
170    Ok(updated)
171}
172
173fn mark_subscription_error(state: &mut ProviderInstallState, count: u32, err: String) {
174    if !state.record.metadata.is_object() {
175        state.record.metadata = json!({});
176    }
177    let meta = state
178        .record
179        .metadata
180        .as_object_mut()
181        .expect("metadata object");
182    meta.insert("subscriptions_failure_count".into(), json!(count));
183    meta.insert("subscriptions_last_error".into(), json!(err));
184    meta.insert("subscriptions_degraded".into(), json!(count >= 3));
185    meta.insert(
186        "subscriptions_last_failure".into(),
187        json!(
188            OffsetDateTime::now_utc()
189                .format(&time::format_description::well_known::Rfc3339)
190                .unwrap_or_else(|_| OffsetDateTime::now_utc().unix_timestamp().to_string())
191        ),
192    );
193}
194
195fn clear_subscription_error(state: &mut ProviderInstallState) {
196    if let Some(obj) = state.record.metadata.as_object_mut() {
197        obj.remove("subscriptions_failure_count");
198        obj.remove("subscriptions_last_error");
199        obj.remove("subscriptions_degraded");
200        obj.remove("subscriptions_last_failure");
201    }
202}
203
204async fn run_provision_sync(
205    state: ProviderInstallState,
206    pack_path: &Path,
207    config: &WorkerConfig,
208) -> Result<Value> {
209    let state = state.clone();
210    let pack_path = pack_path.to_path_buf();
211    let config = config.clone();
212    tokio::task::spawn_blocking(move || {
213        let mut cmd = Command::new(&config.provision_bin);
214        cmd.arg("sync-subscriptions")
215            .arg(&state.record.provider_id)
216            .arg("--install-id")
217            .arg(state.record.install_id.to_string())
218            .arg("--pack")
219            .arg(&pack_path)
220            .arg("--env")
221            .arg(state.record.tenant.env.as_str())
222            .arg("--tenant")
223            .arg(state.record.tenant.tenant.as_str())
224            .arg("--public-base-url")
225            .arg(public_base_url(&state));
226        if let Some(team) = state.record.tenant.team.as_ref() {
227            cmd.arg("--team").arg(team.as_str());
228        }
229        if config.dry_run {
230            cmd.arg("--dry-run");
231        }
232        let output = cmd.output().context("run greentic-provision")?;
233        if !output.status.success() {
234            let stderr = String::from_utf8_lossy(&output.stderr);
235            anyhow::bail!("greentic-provision sync-subscriptions failed: {stderr}");
236        }
237        let stdout = String::from_utf8_lossy(&output.stdout);
238        let plan: Value =
239            serde_json::from_str(&stdout).context("parse sync-subscriptions output")?;
240        Ok(plan)
241    })
242    .await
243    .context("sync-subscriptions join")?
244}
245
246fn public_base_url(state: &ProviderInstallState) -> String {
247    state
248        .record
249        .metadata
250        .get("public_base_url")
251        .and_then(Value::as_str)
252        .unwrap_or("https://example.invalid")
253        .to_string()
254}
255
256pub fn discover_pack_files(root: &Path) -> Result<Vec<PathBuf>> {
257    let mut out = Vec::new();
258    discover_pack_files_inner(root, &mut out)?;
259    Ok(out)
260}
261
262fn discover_pack_files_inner(dir: &Path, out: &mut Vec<PathBuf>) -> Result<()> {
263    if !dir.exists() {
264        return Ok(());
265    }
266    for entry in fs::read_dir(dir).with_context(|| format!("read packs dir {}", dir.display()))? {
267        let entry = entry?;
268        let path = entry.path();
269        if path.is_dir() {
270            discover_pack_files_inner(&path, out)?;
271        } else if path.extension().and_then(|s| s.to_str()) == Some("gtpack") {
272            out.push(path);
273        }
274    }
275    Ok(())
276}
277
278pub fn build_pack_index(paths: &[PathBuf]) -> Result<HashMap<PackId, PathBuf>> {
279    let mut out = HashMap::new();
280    for path in paths {
281        let manifest = read_pack_manifest(path)?;
282        out.insert(manifest.pack_id, path.clone());
283    }
284    Ok(out)
285}
286
287fn read_pack_manifest(path: &Path) -> Result<greentic_types::pack_manifest::PackManifest> {
288    let pack = greentic_pack::reader::open_pack(path, SigningPolicy::DevOk)
289        .map_err(|err| anyhow::anyhow!("open pack {}: {err:?}", path.display()))?;
290    let manifest = pack
291        .files
292        .get("manifest.cbor")
293        .context("missing manifest.cbor")?;
294    greentic_types::decode_pack_manifest(manifest).context("decode pack manifest")
295}
296
297#[derive(Debug, Clone, Eq, PartialEq, Hash)]
298pub struct InstallKey {
299    env: String,
300    tenant: String,
301    team: Option<String>,
302    provider_id: String,
303    install_id: String,
304}
305
306impl InstallKey {
307    fn from_state(state: &ProviderInstallState) -> Self {
308        Self {
309            env: state.record.tenant.env.as_str().to_string(),
310            tenant: state.record.tenant.tenant.as_str().to_string(),
311            team: state
312                .record
313                .tenant
314                .team
315                .as_ref()
316                .map(|t| t.as_str().to_string()),
317            provider_id: state.record.provider_id.clone(),
318            install_id: state.record.install_id.to_string(),
319        }
320    }
321}
322
323#[derive(Debug)]
324pub struct FailureState {
325    count: u32,
326    next_attempt: std::time::Instant,
327}
328
329impl FailureState {
330    fn bump_backoff(&mut self) {
331        let base = Duration::from_secs(30);
332        let exp = self.count.saturating_sub(1).min(10);
333        let factor = 1u32 << exp;
334        let delay = base
335            .saturating_mul(factor)
336            .min(Duration::from_secs(30 * 60));
337        self.next_attempt = std::time::Instant::now() + delay;
338    }
339}
340
341impl Default for FailureState {
342    fn default() -> Self {
343        Self {
344            count: 0,
345            next_attempt: std::time::Instant::now(),
346        }
347    }
348}
349
350fn default_install_store_path(config: &GreenticConfig) -> Option<PathBuf> {
351    let path = config
352        .paths
353        .greentic_root
354        .join(".greentic/dev/installs.json");
355    path.exists().then_some(path)
356}