1use std::collections::HashMap;
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::process::Command;
5use std::time::Duration;
6
7use anyhow::{Context, Result};
8use greentic_config::ConfigResolver;
9use greentic_config_types::GreenticConfig;
10use greentic_pack::reader::SigningPolicy;
11use greentic_types::{EnvId, PackId};
12use gsm_core::{
13 InMemoryProviderInstallStore, ProviderExtensionsRegistry, ProviderInstallState,
14 ProviderInstallStore, ProviderInstallStoreSnapshot, load_install_store_from_path,
15 load_provider_extensions_from_pack_files,
16};
17use serde_json::{Value, json};
18use time::OffsetDateTime;
19use tracing::{info, warn};
20
21#[derive(Debug, Clone)]
22pub struct WorkerConfig {
23 pub env: EnvId,
24 pub packs_root: PathBuf,
25 pub install_store_path: Option<PathBuf>,
26 pub sync_interval: Duration,
27 pub provision_bin: String,
28 pub dry_run: bool,
29}
30
31impl WorkerConfig {
32 pub fn load() -> Result<Self> {
33 let resolved = ConfigResolver::new().load()?;
34 Ok(Self::from_config(&resolved.config))
35 }
36
37 pub fn from_config(config: &GreenticConfig) -> Self {
38 let packs_root = config.paths.greentic_root.join("packs");
39 let install_store_path = default_install_store_path(config);
40 Self {
41 env: config.environment.env_id.clone(),
42 packs_root,
43 install_store_path,
44 sync_interval: Duration::from_secs(15 * 60),
45 provision_bin: "greentic-provision".to_string(),
46 dry_run: false,
47 }
48 }
49}
50
51pub async fn run_worker(config: WorkerConfig) -> Result<()> {
52 let pack_paths = discover_pack_files(&config.packs_root)?;
53 let extensions = load_provider_extensions_from_pack_files(&config.packs_root, &pack_paths)?;
54 let pack_index = build_pack_index(&pack_paths)?;
55 let store = load_store(&config)?;
56 let mut failures = HashMap::new();
57 let mut interval = tokio::time::interval(config.sync_interval);
58
59 info!(
60 env = %config.env.as_str(),
61 packs = %pack_paths.len(),
62 "subscriptions worker started"
63 );
64
65 loop {
66 interval.tick().await;
67 let updated =
68 run_sync_cycle(&store, &extensions, &pack_index, &config, &mut failures).await?;
69 if updated > 0 {
70 info!(updated, "subscriptions state updated");
71 }
72 persist_store(&config, &store)?;
73 }
74}
75
76fn load_store(config: &WorkerConfig) -> Result<InMemoryProviderInstallStore> {
77 if let Some(path) = config.install_store_path.as_ref() {
78 match load_install_store_from_path(path) {
79 Ok(store) => Ok(store),
80 Err(err) => {
81 warn!(error = %err, path = %path.display(), "failed to load install records");
82 Ok(InMemoryProviderInstallStore::default())
83 }
84 }
85 } else {
86 Ok(InMemoryProviderInstallStore::default())
87 }
88}
89
90fn persist_store(config: &WorkerConfig, store: &InMemoryProviderInstallStore) -> Result<()> {
91 let Some(path) = config.install_store_path.as_ref() else {
92 return Ok(());
93 };
94 let snapshot = ProviderInstallStoreSnapshot {
95 records: Vec::new(),
96 states: store.all(),
97 };
98 let payload = serde_json::to_string_pretty(&snapshot).context("serialize install store")?;
99 if let Some(parent) = path.parent() {
100 fs::create_dir_all(parent).with_context(|| format!("create {}", parent.display()))?;
101 }
102 fs::write(path, payload).with_context(|| format!("write {}", path.display()))?;
103 Ok(())
104}
105
106pub async fn run_sync_cycle(
107 store: &InMemoryProviderInstallStore,
108 extensions: &ProviderExtensionsRegistry,
109 pack_index: &HashMap<PackId, PathBuf>,
110 config: &WorkerConfig,
111 failures: &mut HashMap<InstallKey, FailureState>,
112) -> Result<usize> {
113 let mut updated = 0;
114 let installs = store.all();
115 for state in installs {
116 if !extensions
117 .subscriptions
118 .contains_key(&state.record.provider_id)
119 {
120 continue;
121 }
122 let Some(pack_path) = pack_index.get(&state.record.pack_id) else {
123 warn!(
124 provider_id = %state.record.provider_id,
125 pack_id = %state.record.pack_id,
126 "missing pack for subscriptions sync"
127 );
128 continue;
129 };
130 let key = InstallKey::from_state(&state);
131 if let Some(failure) = failures.get(&key)
132 && failure.next_attempt > std::time::Instant::now()
133 && !config.dry_run
134 {
135 continue;
136 }
137
138 let sync_result = run_provision_sync(state.clone(), pack_path, config).await;
139 match sync_result {
140 Ok(plan) => {
141 let mut state = state;
142 let now = OffsetDateTime::now_utc();
143 state.record.updated_at = now;
144 state.record.subscriptions_state = json!({
145 "last_sync": now.format(&time::format_description::well_known::Rfc3339)
146 .unwrap_or_else(|_| now.unix_timestamp().to_string()),
147 "plan": plan,
148 });
149 clear_subscription_error(&mut state);
150 store.insert(state);
151 failures.remove(&key);
152 updated += 1;
153 }
154 Err(err) => {
155 let failure = failures.entry(key.clone()).or_default();
156 failure.count += 1;
157 failure.bump_backoff();
158 let mut state = state;
159 mark_subscription_error(&mut state, failure.count, err.to_string());
160 store.insert(state);
161 warn!(
162 provider_id = %key.provider_id,
163 install_id = %key.install_id,
164 error = %err,
165 "subscriptions sync failed"
166 );
167 }
168 }
169 }
170 Ok(updated)
171}
172
173fn mark_subscription_error(state: &mut ProviderInstallState, count: u32, err: String) {
174 if !state.record.metadata.is_object() {
175 state.record.metadata = json!({});
176 }
177 let meta = state
178 .record
179 .metadata
180 .as_object_mut()
181 .expect("metadata object");
182 meta.insert("subscriptions_failure_count".into(), json!(count));
183 meta.insert("subscriptions_last_error".into(), json!(err));
184 meta.insert("subscriptions_degraded".into(), json!(count >= 3));
185 meta.insert(
186 "subscriptions_last_failure".into(),
187 json!(
188 OffsetDateTime::now_utc()
189 .format(&time::format_description::well_known::Rfc3339)
190 .unwrap_or_else(|_| OffsetDateTime::now_utc().unix_timestamp().to_string())
191 ),
192 );
193}
194
195fn clear_subscription_error(state: &mut ProviderInstallState) {
196 if let Some(obj) = state.record.metadata.as_object_mut() {
197 obj.remove("subscriptions_failure_count");
198 obj.remove("subscriptions_last_error");
199 obj.remove("subscriptions_degraded");
200 obj.remove("subscriptions_last_failure");
201 }
202}
203
204async fn run_provision_sync(
205 state: ProviderInstallState,
206 pack_path: &Path,
207 config: &WorkerConfig,
208) -> Result<Value> {
209 let state = state.clone();
210 let pack_path = pack_path.to_path_buf();
211 let config = config.clone();
212 tokio::task::spawn_blocking(move || {
213 let mut cmd = Command::new(&config.provision_bin);
214 cmd.arg("sync-subscriptions")
215 .arg(&state.record.provider_id)
216 .arg("--install-id")
217 .arg(state.record.install_id.to_string())
218 .arg("--pack")
219 .arg(&pack_path)
220 .arg("--env")
221 .arg(state.record.tenant.env.as_str())
222 .arg("--tenant")
223 .arg(state.record.tenant.tenant.as_str())
224 .arg("--public-base-url")
225 .arg(public_base_url(&state));
226 if let Some(team) = state.record.tenant.team.as_ref() {
227 cmd.arg("--team").arg(team.as_str());
228 }
229 if config.dry_run {
230 cmd.arg("--dry-run");
231 }
232 let output = cmd.output().context("run greentic-provision")?;
233 if !output.status.success() {
234 let stderr = String::from_utf8_lossy(&output.stderr);
235 anyhow::bail!("greentic-provision sync-subscriptions failed: {stderr}");
236 }
237 let stdout = String::from_utf8_lossy(&output.stdout);
238 let plan: Value =
239 serde_json::from_str(&stdout).context("parse sync-subscriptions output")?;
240 Ok(plan)
241 })
242 .await
243 .context("sync-subscriptions join")?
244}
245
246fn public_base_url(state: &ProviderInstallState) -> String {
247 state
248 .record
249 .metadata
250 .get("public_base_url")
251 .and_then(Value::as_str)
252 .unwrap_or("https://example.invalid")
253 .to_string()
254}
255
256pub fn discover_pack_files(root: &Path) -> Result<Vec<PathBuf>> {
257 let mut out = Vec::new();
258 discover_pack_files_inner(root, &mut out)?;
259 Ok(out)
260}
261
262fn discover_pack_files_inner(dir: &Path, out: &mut Vec<PathBuf>) -> Result<()> {
263 if !dir.exists() {
264 return Ok(());
265 }
266 for entry in fs::read_dir(dir).with_context(|| format!("read packs dir {}", dir.display()))? {
267 let entry = entry?;
268 let path = entry.path();
269 if path.is_dir() {
270 discover_pack_files_inner(&path, out)?;
271 } else if path.extension().and_then(|s| s.to_str()) == Some("gtpack") {
272 out.push(path);
273 }
274 }
275 Ok(())
276}
277
278pub fn build_pack_index(paths: &[PathBuf]) -> Result<HashMap<PackId, PathBuf>> {
279 let mut out = HashMap::new();
280 for path in paths {
281 let manifest = read_pack_manifest(path)?;
282 out.insert(manifest.pack_id, path.clone());
283 }
284 Ok(out)
285}
286
287fn read_pack_manifest(path: &Path) -> Result<greentic_types::pack_manifest::PackManifest> {
288 let pack = greentic_pack::reader::open_pack(path, SigningPolicy::DevOk)
289 .map_err(|err| anyhow::anyhow!("open pack {}: {err:?}", path.display()))?;
290 let manifest = pack
291 .files
292 .get("manifest.cbor")
293 .context("missing manifest.cbor")?;
294 greentic_types::decode_pack_manifest(manifest).context("decode pack manifest")
295}
296
297#[derive(Debug, Clone, Eq, PartialEq, Hash)]
298pub struct InstallKey {
299 env: String,
300 tenant: String,
301 team: Option<String>,
302 provider_id: String,
303 install_id: String,
304}
305
306impl InstallKey {
307 fn from_state(state: &ProviderInstallState) -> Self {
308 Self {
309 env: state.record.tenant.env.as_str().to_string(),
310 tenant: state.record.tenant.tenant.as_str().to_string(),
311 team: state
312 .record
313 .tenant
314 .team
315 .as_ref()
316 .map(|t| t.as_str().to_string()),
317 provider_id: state.record.provider_id.clone(),
318 install_id: state.record.install_id.to_string(),
319 }
320 }
321}
322
323#[derive(Debug)]
324pub struct FailureState {
325 count: u32,
326 next_attempt: std::time::Instant,
327}
328
329impl FailureState {
330 fn bump_backoff(&mut self) {
331 let base = Duration::from_secs(30);
332 let exp = self.count.saturating_sub(1).min(10);
333 let factor = 1u32 << exp;
334 let delay = base
335 .saturating_mul(factor)
336 .min(Duration::from_secs(30 * 60));
337 self.next_attempt = std::time::Instant::now() + delay;
338 }
339}
340
341impl Default for FailureState {
342 fn default() -> Self {
343 Self {
344 count: 0,
345 next_attempt: std::time::Instant::now(),
346 }
347 }
348}
349
350fn default_install_store_path(config: &GreenticConfig) -> Option<PathBuf> {
351 let path = config
352 .paths
353 .greentic_root
354 .join(".greentic/dev/installs.json");
355 path.exists().then_some(path)
356}