1use std::collections::HashMap;
7use std::path::Path;
8use std::sync::Arc;
9
10use anyhow::{Result, anyhow};
11use greentic_state::redis_store::RedisStateStore;
12
13use crate::capabilities::ResolveScope;
14use crate::demo::runner_host::{DemoRunnerHost, OperatorContext};
15use crate::domains::Domain;
16use crate::operator_i18n;
17use crate::operator_log;
18use crate::secrets_gate::SecretsManagerHandle;
19use crate::secrets_setup::{SecretsSetup, resolve_env};
20use greentic_runner_host::storage::DynStateStore;
21
22enum CapabilityPriority {
27 Required,
28 Recommended,
29}
30
31struct CapabilityExpectation {
32 cap_id: &'static str,
33 priority: CapabilityPriority,
34}
35
36fn capability_expectations_for_domains(domains: &[Domain]) -> Vec<CapabilityExpectation> {
37 let mut out = Vec::new();
38 let has_messaging = domains.contains(&Domain::Messaging);
39 let has_events = domains.contains(&Domain::Events);
40 let has_secrets = domains.contains(&Domain::Secrets);
41
42 if has_messaging {
43 out.push(CapabilityExpectation {
44 cap_id: "greentic.cap.messaging.provider.v1",
45 priority: CapabilityPriority::Required,
46 });
47 }
48 if has_events {
49 out.push(CapabilityExpectation {
50 cap_id: "greentic.cap.events.provider.v1",
51 priority: CapabilityPriority::Required,
52 });
53 }
54 if has_secrets {
55 out.push(CapabilityExpectation {
56 cap_id: "greentic.cap.secrets.store.v1",
57 priority: CapabilityPriority::Required,
58 });
59 }
60 if has_messaging || has_events {
61 out.push(CapabilityExpectation {
62 cap_id: "greentic.cap.oauth.broker.v1",
63 priority: CapabilityPriority::Recommended,
64 });
65 out.push(CapabilityExpectation {
66 cap_id: "greentic.cap.mcp.exec.v1",
67 priority: CapabilityPriority::Recommended,
68 });
69 }
70
71 out
72}
73
74pub fn log_capability_bootstrap_report(
76 runner_host: &DemoRunnerHost,
77 ctx: &OperatorContext,
78 domains: &[Domain],
79) {
80 let scope = ResolveScope {
81 env: std::env::var("GREENTIC_ENV").ok(),
82 tenant: Some(ctx.tenant.clone()),
83 team: ctx.team.clone(),
84 };
85 let expectations = capability_expectations_for_domains(domains);
86 let mut missing_required = Vec::new();
87 let mut missing_recommended = Vec::new();
88 for item in &expectations {
89 let resolved = runner_host.resolve_capability(item.cap_id, None, scope.clone());
90 if resolved.is_none()
91 && item.cap_id == "greentic.cap.secrets.store.v1"
92 && domains.contains(&Domain::Secrets)
93 && runner_host.has_provider_packs_for_domain(Domain::Secrets)
94 {
95 operator_log::info(
96 module_path!(),
97 "capability bootstrap: using legacy secrets providers fallback for greentic.cap.secrets.store.v1",
98 );
99 continue;
100 }
101 if resolved.is_none() {
102 match item.priority {
103 CapabilityPriority::Required => missing_required.push(item.cap_id.to_string()),
104 CapabilityPriority::Recommended => {
105 missing_recommended.push(item.cap_id.to_string())
106 }
107 }
108 }
109 }
110
111 let pending_setup = runner_host.capability_setup_plan(ctx);
112 if pending_setup.is_empty() {
113 operator_log::info(
114 module_path!(),
115 "capability setup plan: no capabilities requiring setup found",
116 );
117 } else {
118 let ids = pending_setup
119 .iter()
120 .map(|binding| format!("{}@{}", binding.cap_id, binding.stable_id))
121 .collect::<Vec<_>>()
122 .join(", ");
123 operator_log::info(
124 module_path!(),
125 format!(
126 "capability setup plan pending={} [{}]",
127 pending_setup.len(),
128 ids
129 ),
130 );
131 }
132
133 if !missing_required.is_empty() {
134 let joined = missing_required.join(", ");
135 operator_log::warn(
136 module_path!(),
137 format!("missing required capabilities for setup/start: {joined}"),
138 );
139 eprintln!(
140 "{}",
141 operator_i18n::trf(
142 "cli.capability.bootstrap.missing_required",
143 "Warning: missing required capabilities: {}",
144 &[&joined]
145 )
146 );
147 }
148 if !missing_recommended.is_empty() {
149 let joined = missing_recommended.join(", ");
150 operator_log::warn(
151 module_path!(),
152 format!("missing recommended capabilities for setup/start: {joined}"),
153 );
154 eprintln!(
155 "{}",
156 operator_i18n::trf(
157 "cli.capability.bootstrap.missing_recommended",
158 "Note: missing recommended capabilities: {}",
159 &[&joined]
160 )
161 );
162 }
163}
164
165const CAP_TELEMETRY_V1: &str = "greentic.cap.telemetry.v1";
170const TELEMETRY_CONFIGURE_OP: &str = "telemetry.configure";
171const DEFAULT_TELEMETRY_SERVICE_NAME: &str = "greentic-operator";
172
173#[derive(Debug, Clone, serde::Deserialize)]
174struct LegacyTelemetryProviderConfig {
175 #[serde(default)]
176 service_name: Option<String>,
177 #[serde(default)]
178 export_mode: Option<String>,
179 #[serde(default)]
180 preset: Option<String>,
181 #[serde(default)]
182 endpoint: Option<String>,
183 #[serde(default, alias = "otlp_endpoint")]
184 otlp_endpoint: Option<String>,
185 #[serde(default)]
186 headers: HashMap<String, String>,
187 #[serde(default, alias = "otlp_headers")]
188 otlp_headers: HashMap<String, String>,
189 #[serde(default)]
190 sampling_ratio: Option<f64>,
191 #[serde(default)]
192 compression: Option<String>,
193}
194
195impl LegacyTelemetryProviderConfig {
196 fn service_name(&self) -> &str {
197 self.service_name
198 .as_deref()
199 .filter(|value| !value.trim().is_empty())
200 .unwrap_or(DEFAULT_TELEMETRY_SERVICE_NAME)
201 }
202
203 fn endpoint(&self) -> Option<&str> {
204 self.endpoint
205 .as_deref()
206 .filter(|value| !value.trim().is_empty())
207 .or_else(|| {
208 self.otlp_endpoint
209 .as_deref()
210 .filter(|value| !value.trim().is_empty())
211 })
212 }
213
214 fn merged_headers(&self) -> HashMap<String, String> {
215 let mut headers = self.otlp_headers.clone();
216 headers.extend(self.headers.clone());
217 headers
218 }
219
220 fn to_runtime_config(
221 &self,
222 ) -> Result<(
223 greentic_telemetry::TelemetryConfig,
224 greentic_telemetry::export::ExportConfig,
225 )> {
226 use greentic_telemetry::export::{
227 Compression as RuntimeCompression, ExportConfig, ExportMode,
228 Sampling as RuntimeSampling,
229 };
230
231 let preset = self.preset.as_deref().map(parse_cloud_preset).transpose()?;
232 let preset_config = preset
233 .map(greentic_telemetry::presets::load_preset)
234 .transpose()?
235 .unwrap_or_default();
236
237 let mode = if let Some(export_mode) = self.export_mode.as_deref() {
238 parse_export_mode(export_mode)?
239 } else {
240 preset_config.export_mode.unwrap_or(ExportMode::JsonStdout)
241 };
242
243 let endpoint = self
244 .endpoint()
245 .map(str::to_owned)
246 .or(preset_config.otlp_endpoint);
247
248 let mut headers = preset_config.otlp_headers;
249 headers.extend(self.merged_headers());
250
251 let sampling = match self.sampling_ratio {
252 Some(ratio) if !(0.0..=1.0).contains(&ratio) => {
253 return Err(anyhow!(
254 "telemetry.configure returned sampling_ratio outside 0.0..=1.0: {ratio}"
255 ));
256 }
257 Some(ratio) if ratio <= 0.0 => RuntimeSampling::AlwaysOff,
258 Some(ratio) if ratio >= 1.0 => RuntimeSampling::AlwaysOn,
259 Some(ratio) => RuntimeSampling::TraceIdRatio(ratio),
260 None => RuntimeSampling::Parent,
261 };
262
263 let compression = self
264 .compression
265 .as_deref()
266 .map(parse_compression)
267 .transpose()?;
268
269 Ok((
270 greentic_telemetry::TelemetryConfig {
271 service_name: self.service_name().to_string(),
272 },
273 ExportConfig {
274 mode,
275 endpoint,
276 headers,
277 sampling,
278 compression: compression.map(|value| match value {
279 CompressionCompat::Gzip => RuntimeCompression::Gzip,
280 }),
281 },
282 ))
283 }
284}
285
286#[derive(Clone, Copy, Debug)]
287enum CompressionCompat {
288 Gzip,
289}
290
291fn parse_export_mode(value: &str) -> Result<greentic_telemetry::export::ExportMode> {
292 use greentic_telemetry::export::ExportMode;
293
294 match value.trim().to_ascii_lowercase().as_str() {
295 "json-stdout" | "json_stdout" | "stdout" => Ok(ExportMode::JsonStdout),
296 "otlp-grpc" | "otlp_grpc" => Ok(ExportMode::OtlpGrpc),
297 "otlp-http" | "otlp_http" => Ok(ExportMode::OtlpHttp),
298 other => Err(anyhow!("unsupported telemetry export_mode '{other}'")),
299 }
300}
301
302fn parse_cloud_preset(value: &str) -> Result<greentic_telemetry::presets::CloudPreset> {
303 use greentic_telemetry::presets::CloudPreset;
304
305 match value.trim().to_ascii_lowercase().as_str() {
306 "aws" => Ok(CloudPreset::Aws),
307 "gcp" => Ok(CloudPreset::Gcp),
308 "azure" => Ok(CloudPreset::Azure),
309 "datadog" => Ok(CloudPreset::Datadog),
310 "loki" => Ok(CloudPreset::Loki),
311 "none" => Ok(CloudPreset::None),
312 other => Err(anyhow!("unsupported telemetry preset '{other}'")),
313 }
314}
315
316fn parse_compression(value: &str) -> Result<CompressionCompat> {
317 match value.trim().to_ascii_lowercase().as_str() {
318 "gzip" => Ok(CompressionCompat::Gzip),
319 other => Err(anyhow!("unsupported telemetry compression '{other}'")),
320 }
321}
322
323fn validate_telemetry_config(config: &LegacyTelemetryProviderConfig) -> Vec<String> {
324 let mut warnings = Vec::new();
325
326 if config.export_mode.is_none() && config.preset.is_none() {
327 warnings.push(
328 "telemetry.configure returned no export_mode or preset; defaulting to json-stdout"
329 .to_string(),
330 );
331 }
332
333 if matches!(
334 config.export_mode
335 .as_deref()
336 .map(|value| value.trim().to_ascii_lowercase()),
337 Some(mode) if mode == "otlp-grpc" || mode == "otlp_grpc" || mode == "otlp-http" || mode == "otlp_http"
338 ) && config.endpoint().is_none()
339 && config.preset.is_none()
340 {
341 warnings.push(
342 "telemetry.configure returned OTLP mode without endpoint or preset; runtime defaults will be used"
343 .to_string(),
344 );
345 }
346
347 warnings
348}
349
350pub fn try_upgrade_telemetry(
360 bundle: &Path,
361 runner_host: &DemoRunnerHost,
362 tenant: &str,
363 team: Option<&str>,
364 env_override: Option<&str>,
365 setup_answers: Option<&serde_json::Value>,
366) -> Result<bool> {
367 let env = resolve_env(env_override);
368 let scope = ResolveScope {
369 env: Some(env.clone()),
370 tenant: Some(tenant.to_string()),
371 team: team.map(|t| t.to_string()),
372 };
373
374 let Some(binding) = runner_host.resolve_capability(CAP_TELEMETRY_V1, None, scope) else {
376 tracing::debug!("no telemetry capability found — skipping upgrade");
377 return Ok(false);
378 };
379 tracing::info!(
380 pack_id = %binding.pack_id,
381 stable_id = %binding.stable_id,
382 "resolved telemetry capability"
383 );
384
385 if let Ok(secrets_setup) = SecretsSetup::new(bundle, &env, tenant, team) {
387 if let Ok(rt) = tokio::runtime::Builder::new_current_thread()
388 .enable_all()
389 .build()
390 {
391 if let Err(e) =
392 rt.block_on(secrets_setup.ensure_pack_secrets(&binding.pack_path, &binding.pack_id))
393 {
394 tracing::warn!(
395 pack_id = %binding.pack_id,
396 error = %e,
397 "telemetry capability secret seeding failed"
398 );
399 }
400 }
401 }
402
403 if let Some(answers) = setup_answers {
409 if answers.as_object().is_some_and(|m| !m.is_empty()) {
410 let pack_path_ref = Some(binding.pack_path.as_path());
411 let persist_rt = tokio::runtime::Builder::new_current_thread()
412 .enable_all()
413 .build();
414 if let Ok(rt) = persist_rt {
415 match rt.block_on(crate::qa_persist::persist_all_config_as_secrets(
416 bundle,
417 &env,
418 tenant,
419 team,
420 &binding.pack_id,
421 answers,
422 pack_path_ref,
423 )) {
424 Ok(saved) if !saved.is_empty() => {
425 tracing::info!(
426 pack_id = %binding.pack_id,
427 count = saved.len(),
428 keys = ?saved,
429 "persisted telemetry setup answers as secrets"
430 );
431 }
432 Err(e) => {
433 tracing::warn!(
434 pack_id = %binding.pack_id,
435 error = %e,
436 "failed to persist telemetry setup answers"
437 );
438 }
439 _ => {}
440 }
441 }
442 }
443 }
444
445 let ctx = OperatorContext {
450 tenant: tenant.to_string(),
451 team: team.map(|t| t.to_string()),
452 correlation_id: None,
453 };
454 if let Err(e) = runner_host.mark_capability_ready(&ctx, &binding) {
455 tracing::warn!(error = %e, "failed to mark telemetry capability as ready (non-fatal)");
456 }
457
458 let payload = serde_json::json!({});
460 let payload_bytes = serde_json::to_vec(&payload)?;
461
462 let outcome = runner_host.invoke_capability(
463 CAP_TELEMETRY_V1,
464 TELEMETRY_CONFIGURE_OP,
465 &payload_bytes,
466 &ctx,
467 )?;
468
469 if !outcome.success {
470 let error_msg = outcome.error.unwrap_or_else(|| "unknown error".to_string());
471 tracing::warn!(error = %error_msg, "telemetry.configure capability invocation failed");
472 return Ok(false);
473 }
474
475 let raw_output = match outcome.output {
477 Some(value) => value,
478 None => {
479 tracing::warn!("telemetry.configure returned no output");
480 return Ok(false);
481 }
482 };
483
484 tracing::debug!(config = %raw_output, "telemetry provider config received");
485
486 let config_json = if let Some(inner) = raw_output.get("output") {
489 inner.clone()
490 } else {
491 raw_output
492 };
493
494 let config: LegacyTelemetryProviderConfig = serde_json::from_value(config_json)?;
495
496 let warnings = validate_telemetry_config(&config);
498 for warning in &warnings {
499 tracing::warn!(warning = %warning, "telemetry config validation");
500 }
501
502 let (telemetry_config, export_config) = config.to_runtime_config()?;
504 greentic_telemetry::init_telemetry_from_config(telemetry_config, export_config)?;
505
506 tracing::info!(
507 export_mode = ?config.export_mode,
508 preset = ?config.preset,
509 endpoint = ?config.endpoint(),
510 sampling_ratio = config.sampling_ratio,
511 "telemetry upgraded from capability provider"
512 );
513
514 Ok(true)
515}
516
517const CAP_STATE_KV_V1: &str = "greentic.cap.state.kv.v1";
522
523pub fn try_upgrade_state_store(
532 bundle: &Path,
533 runner_host: &DemoRunnerHost,
534 secrets_handle: &SecretsManagerHandle,
535 tenant: &str,
536 team: Option<&str>,
537 env_override: Option<&str>,
538 setup_answers: Option<&serde_json::Value>,
539) -> Result<Option<DynStateStore>> {
540 let env = resolve_env(env_override);
541 let scope = ResolveScope {
542 env: Some(env.clone()),
543 tenant: Some(tenant.to_string()),
544 team: team.map(|t| t.to_string()),
545 };
546
547 let Some(binding) = runner_host.resolve_capability(CAP_STATE_KV_V1, None, scope) else {
549 eprintln!(
550 "[state-store] no capability '{}' found — using in-memory",
551 CAP_STATE_KV_V1
552 );
553 return Ok(None);
554 };
555 eprintln!(
556 "[state-store] resolved capability: pack_id={} stable_id={}",
557 binding.pack_id, binding.stable_id
558 );
559
560 if let Ok(secrets_setup) = SecretsSetup::new(bundle, &env, tenant, team) {
562 if let Ok(rt) = tokio::runtime::Builder::new_current_thread()
563 .enable_all()
564 .build()
565 {
566 if let Err(e) =
567 rt.block_on(secrets_setup.ensure_pack_secrets(&binding.pack_path, &binding.pack_id))
568 {
569 tracing::warn!(
570 pack_id = %binding.pack_id,
571 error = %e,
572 "state capability secret seeding failed"
573 );
574 }
575 }
576 }
577
578 if let Some(answers) = setup_answers {
581 if answers.as_object().is_some_and(|m| !m.is_empty()) {
582 let pack_path_ref = Some(binding.pack_path.as_path());
583 let persist_rt = tokio::runtime::Builder::new_current_thread()
584 .enable_all()
585 .build();
586 if let Ok(rt) = persist_rt {
587 match rt.block_on(crate::qa_persist::persist_all_config_as_secrets(
588 bundle,
589 &env,
590 tenant,
591 team,
592 &binding.pack_id,
593 answers,
594 pack_path_ref,
595 )) {
596 Ok(saved) if !saved.is_empty() => {
597 tracing::info!(
598 pack_id = %binding.pack_id,
599 count = saved.len(),
600 keys = ?saved,
601 "persisted state-redis setup answers as secrets"
602 );
603 }
604 Err(e) => {
605 tracing::warn!(
606 pack_id = %binding.pack_id,
607 error = %e,
608 "failed to persist state-redis setup answers"
609 );
610 }
611 _ => {}
612 }
613 }
614 }
615 }
616
617 let canonical_team = crate::secrets_manager::canonical_team(team);
619 let secret_uri = format!(
620 "secrets://{}/{}/{}/{}/redis_url",
621 env, tenant, canonical_team, binding.pack_id
622 );
623
624 eprintln!("[state-store] reading secret: {}", secret_uri);
625 let redis_url = {
626 let rt = tokio::runtime::Builder::new_current_thread()
627 .enable_all()
628 .build()?;
629 let manager = secrets_handle.manager();
630 match rt.block_on(manager.read(&secret_uri)) {
631 Ok(bytes) => {
632 let url = String::from_utf8(bytes).ok();
633 eprintln!(
634 "[state-store] redis_url secret found (len={})",
635 url.as_ref().map_or(0, |s| s.len())
636 );
637 url
638 }
639 Err(e) => {
640 eprintln!("[state-store] failed to read redis_url secret: {e}");
641 None
642 }
643 }
644 };
645
646 let Some(redis_url) = redis_url else {
647 match std::env::var("REDIS_URL") {
649 Ok(url) => {
650 tracing::info!("using REDIS_URL environment variable for state store");
651 return create_redis_store(&url);
652 }
653 Err(_) => {
654 tracing::warn!(
655 "redis_url secret not found and REDIS_URL env not set — using in-memory state store"
656 );
657 return Ok(None);
658 }
659 }
660 };
661
662 create_redis_store(&redis_url)
663}
664
665fn create_redis_store(redis_url: &str) -> Result<Option<DynStateStore>> {
666 match RedisStateStore::from_url(redis_url) {
667 Ok(store) => {
668 let store: DynStateStore = Arc::new(store);
669 eprintln!("[state-store] ✓ upgraded to Redis: {}", redis_url);
670 tracing::info!(
671 redis_url = %redis_url,
672 "state store upgraded to Redis"
673 );
674 Ok(Some(store))
675 }
676 Err(e) => {
677 eprintln!("[state-store] ✗ failed to create Redis store: {e}");
678 tracing::warn!(
679 error = %e,
680 "failed to create Redis state store — using in-memory fallback"
681 );
682 Ok(None)
683 }
684 }
685}