Skip to main content

greentic_operator/
capability_bootstrap.rs

1//! Capability bootstrap: discovery report, telemetry upgrade, and state store upgrade.
2//!
3//! Extracted from `cli.rs` to keep the CLI module focused on argument
4//! parsing and command dispatch.
5
6use std::collections::HashMap;
7use std::path::Path;
8use std::sync::Arc;
9
10use anyhow::{Result, anyhow};
11use greentic_state::redis_store::RedisStateStore;
12
13use crate::capabilities::ResolveScope;
14use crate::demo::runner_host::{DemoRunnerHost, OperatorContext};
15use crate::domains::Domain;
16use crate::operator_i18n;
17use crate::operator_log;
18use crate::secrets_gate::SecretsManagerHandle;
19use crate::secrets_setup::{SecretsSetup, resolve_env};
20use greentic_runner_host::storage::DynStateStore;
21
22// ---------------------------------------------------------------------------
23// Capability expectations & bootstrap report
24// ---------------------------------------------------------------------------
25
26enum CapabilityPriority {
27    Required,
28    Recommended,
29}
30
31struct CapabilityExpectation {
32    cap_id: &'static str,
33    priority: CapabilityPriority,
34}
35
36fn capability_expectations_for_domains(domains: &[Domain]) -> Vec<CapabilityExpectation> {
37    let mut out = Vec::new();
38    let has_messaging = domains.contains(&Domain::Messaging);
39    let has_events = domains.contains(&Domain::Events);
40    let has_secrets = domains.contains(&Domain::Secrets);
41
42    if has_messaging {
43        out.push(CapabilityExpectation {
44            cap_id: "greentic.cap.messaging.provider.v1",
45            priority: CapabilityPriority::Required,
46        });
47    }
48    if has_events {
49        out.push(CapabilityExpectation {
50            cap_id: "greentic.cap.events.provider.v1",
51            priority: CapabilityPriority::Required,
52        });
53    }
54    if has_secrets {
55        out.push(CapabilityExpectation {
56            cap_id: "greentic.cap.secrets.store.v1",
57            priority: CapabilityPriority::Required,
58        });
59    }
60    if has_messaging || has_events {
61        out.push(CapabilityExpectation {
62            cap_id: "greentic.cap.oauth.broker.v1",
63            priority: CapabilityPriority::Recommended,
64        });
65        out.push(CapabilityExpectation {
66            cap_id: "greentic.cap.mcp.exec.v1",
67            priority: CapabilityPriority::Recommended,
68        });
69    }
70
71    out
72}
73
74/// Log a report of resolved vs missing capabilities for the given domains.
75pub fn log_capability_bootstrap_report(
76    runner_host: &DemoRunnerHost,
77    ctx: &OperatorContext,
78    domains: &[Domain],
79) {
80    let scope = ResolveScope {
81        env: std::env::var("GREENTIC_ENV").ok(),
82        tenant: Some(ctx.tenant.clone()),
83        team: ctx.team.clone(),
84    };
85    let expectations = capability_expectations_for_domains(domains);
86    let mut missing_required = Vec::new();
87    let mut missing_recommended = Vec::new();
88    for item in &expectations {
89        let resolved = runner_host.resolve_capability(item.cap_id, None, scope.clone());
90        if resolved.is_none()
91            && item.cap_id == "greentic.cap.secrets.store.v1"
92            && domains.contains(&Domain::Secrets)
93            && runner_host.has_provider_packs_for_domain(Domain::Secrets)
94        {
95            operator_log::info(
96                module_path!(),
97                "capability bootstrap: using legacy secrets providers fallback for greentic.cap.secrets.store.v1",
98            );
99            continue;
100        }
101        if resolved.is_none() {
102            match item.priority {
103                CapabilityPriority::Required => missing_required.push(item.cap_id.to_string()),
104                CapabilityPriority::Recommended => {
105                    missing_recommended.push(item.cap_id.to_string())
106                }
107            }
108        }
109    }
110
111    let pending_setup = runner_host.capability_setup_plan(ctx);
112    if pending_setup.is_empty() {
113        operator_log::info(
114            module_path!(),
115            "capability setup plan: no capabilities requiring setup found",
116        );
117    } else {
118        let ids = pending_setup
119            .iter()
120            .map(|binding| format!("{}@{}", binding.cap_id, binding.stable_id))
121            .collect::<Vec<_>>()
122            .join(", ");
123        operator_log::info(
124            module_path!(),
125            format!(
126                "capability setup plan pending={} [{}]",
127                pending_setup.len(),
128                ids
129            ),
130        );
131    }
132
133    if !missing_required.is_empty() {
134        let joined = missing_required.join(", ");
135        operator_log::warn(
136            module_path!(),
137            format!("missing required capabilities for setup/start: {joined}"),
138        );
139        eprintln!(
140            "{}",
141            operator_i18n::trf(
142                "cli.capability.bootstrap.missing_required",
143                "Warning: missing required capabilities: {}",
144                &[&joined]
145            )
146        );
147    }
148    if !missing_recommended.is_empty() {
149        let joined = missing_recommended.join(", ");
150        operator_log::warn(
151            module_path!(),
152            format!("missing recommended capabilities for setup/start: {joined}"),
153        );
154        eprintln!(
155            "{}",
156            operator_i18n::trf(
157                "cli.capability.bootstrap.missing_recommended",
158                "Note: missing recommended capabilities: {}",
159                &[&joined]
160            )
161        );
162    }
163}
164
165// ---------------------------------------------------------------------------
166// Telemetry capability upgrade
167// ---------------------------------------------------------------------------
168
169const CAP_TELEMETRY_V1: &str = "greentic.cap.telemetry.v1";
170const TELEMETRY_CONFIGURE_OP: &str = "telemetry.configure";
171const DEFAULT_TELEMETRY_SERVICE_NAME: &str = "greentic-operator";
172
173#[derive(Debug, Clone, serde::Deserialize)]
174struct LegacyTelemetryProviderConfig {
175    #[serde(default)]
176    service_name: Option<String>,
177    #[serde(default)]
178    export_mode: Option<String>,
179    #[serde(default)]
180    preset: Option<String>,
181    #[serde(default)]
182    endpoint: Option<String>,
183    #[serde(default, alias = "otlp_endpoint")]
184    otlp_endpoint: Option<String>,
185    #[serde(default)]
186    headers: HashMap<String, String>,
187    #[serde(default, alias = "otlp_headers")]
188    otlp_headers: HashMap<String, String>,
189    #[serde(default)]
190    sampling_ratio: Option<f64>,
191    #[serde(default)]
192    compression: Option<String>,
193}
194
195impl LegacyTelemetryProviderConfig {
196    fn service_name(&self) -> &str {
197        self.service_name
198            .as_deref()
199            .filter(|value| !value.trim().is_empty())
200            .unwrap_or(DEFAULT_TELEMETRY_SERVICE_NAME)
201    }
202
203    fn endpoint(&self) -> Option<&str> {
204        self.endpoint
205            .as_deref()
206            .filter(|value| !value.trim().is_empty())
207            .or_else(|| {
208                self.otlp_endpoint
209                    .as_deref()
210                    .filter(|value| !value.trim().is_empty())
211            })
212    }
213
214    fn merged_headers(&self) -> HashMap<String, String> {
215        let mut headers = self.otlp_headers.clone();
216        headers.extend(self.headers.clone());
217        headers
218    }
219
220    fn to_runtime_config(
221        &self,
222    ) -> Result<(
223        greentic_telemetry::TelemetryConfig,
224        greentic_telemetry::export::ExportConfig,
225    )> {
226        use greentic_telemetry::export::{
227            Compression as RuntimeCompression, ExportConfig, ExportMode,
228            Sampling as RuntimeSampling,
229        };
230
231        let preset = self.preset.as_deref().map(parse_cloud_preset).transpose()?;
232        let preset_config = preset
233            .map(greentic_telemetry::presets::load_preset)
234            .transpose()?
235            .unwrap_or_default();
236
237        let mode = if let Some(export_mode) = self.export_mode.as_deref() {
238            parse_export_mode(export_mode)?
239        } else {
240            preset_config.export_mode.unwrap_or(ExportMode::JsonStdout)
241        };
242
243        let endpoint = self
244            .endpoint()
245            .map(str::to_owned)
246            .or(preset_config.otlp_endpoint);
247
248        let mut headers = preset_config.otlp_headers;
249        headers.extend(self.merged_headers());
250
251        let sampling = match self.sampling_ratio {
252            Some(ratio) if !(0.0..=1.0).contains(&ratio) => {
253                return Err(anyhow!(
254                    "telemetry.configure returned sampling_ratio outside 0.0..=1.0: {ratio}"
255                ));
256            }
257            Some(ratio) if ratio <= 0.0 => RuntimeSampling::AlwaysOff,
258            Some(ratio) if ratio >= 1.0 => RuntimeSampling::AlwaysOn,
259            Some(ratio) => RuntimeSampling::TraceIdRatio(ratio),
260            None => RuntimeSampling::Parent,
261        };
262
263        let compression = self
264            .compression
265            .as_deref()
266            .map(parse_compression)
267            .transpose()?;
268
269        Ok((
270            greentic_telemetry::TelemetryConfig {
271                service_name: self.service_name().to_string(),
272            },
273            ExportConfig {
274                mode,
275                endpoint,
276                headers,
277                sampling,
278                compression: compression.map(|value| match value {
279                    CompressionCompat::Gzip => RuntimeCompression::Gzip,
280                }),
281            },
282        ))
283    }
284}
285
286#[derive(Clone, Copy, Debug)]
287enum CompressionCompat {
288    Gzip,
289}
290
291fn parse_export_mode(value: &str) -> Result<greentic_telemetry::export::ExportMode> {
292    use greentic_telemetry::export::ExportMode;
293
294    match value.trim().to_ascii_lowercase().as_str() {
295        "json-stdout" | "json_stdout" | "stdout" => Ok(ExportMode::JsonStdout),
296        "otlp-grpc" | "otlp_grpc" => Ok(ExportMode::OtlpGrpc),
297        "otlp-http" | "otlp_http" => Ok(ExportMode::OtlpHttp),
298        other => Err(anyhow!("unsupported telemetry export_mode '{other}'")),
299    }
300}
301
302fn parse_cloud_preset(value: &str) -> Result<greentic_telemetry::presets::CloudPreset> {
303    use greentic_telemetry::presets::CloudPreset;
304
305    match value.trim().to_ascii_lowercase().as_str() {
306        "aws" => Ok(CloudPreset::Aws),
307        "gcp" => Ok(CloudPreset::Gcp),
308        "azure" => Ok(CloudPreset::Azure),
309        "datadog" => Ok(CloudPreset::Datadog),
310        "loki" => Ok(CloudPreset::Loki),
311        "none" => Ok(CloudPreset::None),
312        other => Err(anyhow!("unsupported telemetry preset '{other}'")),
313    }
314}
315
316fn parse_compression(value: &str) -> Result<CompressionCompat> {
317    match value.trim().to_ascii_lowercase().as_str() {
318        "gzip" => Ok(CompressionCompat::Gzip),
319        other => Err(anyhow!("unsupported telemetry compression '{other}'")),
320    }
321}
322
323fn validate_telemetry_config(config: &LegacyTelemetryProviderConfig) -> Vec<String> {
324    let mut warnings = Vec::new();
325
326    if config.export_mode.is_none() && config.preset.is_none() {
327        warnings.push(
328            "telemetry.configure returned no export_mode or preset; defaulting to json-stdout"
329                .to_string(),
330        );
331    }
332
333    if matches!(
334        config.export_mode
335            .as_deref()
336            .map(|value| value.trim().to_ascii_lowercase()),
337        Some(mode) if mode == "otlp-grpc" || mode == "otlp_grpc" || mode == "otlp-http" || mode == "otlp_http"
338    ) && config.endpoint().is_none()
339        && config.preset.is_none()
340    {
341        warnings.push(
342            "telemetry.configure returned OTLP mode without endpoint or preset; runtime defaults will be used"
343                .to_string(),
344        );
345    }
346
347    warnings
348}
349
350/// Seed secrets for the telemetry capability pack, invoke the WASM component,
351/// and initialize the OTel pipeline with the returned config.
352///
353/// When `setup_answers` is provided (e.g. from `--setup-input`), the values are
354/// persisted to the dev secrets store so the WASM component can read them via
355/// `read_secret()`.  Without this step the component receives empty secrets and
356/// the exporter has no valid connection string / endpoint.
357///
358/// Returns `Ok(true)` if telemetry was upgraded, `Ok(false)` if no capability found.
359pub fn try_upgrade_telemetry(
360    bundle: &Path,
361    runner_host: &DemoRunnerHost,
362    tenant: &str,
363    team: Option<&str>,
364    env_override: Option<&str>,
365    setup_answers: Option<&serde_json::Value>,
366) -> Result<bool> {
367    let env = resolve_env(env_override);
368    let scope = ResolveScope {
369        env: Some(env.clone()),
370        tenant: Some(tenant.to_string()),
371        team: team.map(|t| t.to_string()),
372    };
373
374    // 1. Resolve the telemetry capability
375    let Some(binding) = runner_host.resolve_capability(CAP_TELEMETRY_V1, None, scope) else {
376        tracing::debug!("no telemetry capability found — skipping upgrade");
377        return Ok(false);
378    };
379    tracing::info!(
380        pack_id = %binding.pack_id,
381        stable_id = %binding.stable_id,
382        "resolved telemetry capability"
383    );
384
385    // 2. Seed secrets for the telemetry pack
386    if let Ok(secrets_setup) = SecretsSetup::new(bundle, &env, tenant, team) {
387        if let Ok(rt) = tokio::runtime::Builder::new_current_thread()
388            .enable_all()
389            .build()
390        {
391            if let Err(e) =
392                rt.block_on(secrets_setup.ensure_pack_secrets(&binding.pack_path, &binding.pack_id))
393            {
394                tracing::warn!(
395                    pack_id = %binding.pack_id,
396                    error = %e,
397                    "telemetry capability secret seeding failed"
398                );
399            }
400        }
401    }
402
403    // 2b. Persist setup_answers as secrets so the WASM component can read them.
404    //     The wizard execute flow only persists secrets for domain providers
405    //     (Messaging/Events/Secrets/OAuth) but capability packs like
406    //     telemetry-otlp are not in any domain, so their config values never
407    //     reach the dev store.
408    if let Some(answers) = setup_answers {
409        if answers.as_object().is_some_and(|m| !m.is_empty()) {
410            let pack_path_ref = Some(binding.pack_path.as_path());
411            let persist_rt = tokio::runtime::Builder::new_current_thread()
412                .enable_all()
413                .build();
414            if let Ok(rt) = persist_rt {
415                match rt.block_on(crate::qa_persist::persist_all_config_as_secrets(
416                    bundle,
417                    &env,
418                    tenant,
419                    team,
420                    &binding.pack_id,
421                    answers,
422                    pack_path_ref,
423                )) {
424                    Ok(saved) if !saved.is_empty() => {
425                        tracing::info!(
426                            pack_id = %binding.pack_id,
427                            count = saved.len(),
428                            keys = ?saved,
429                            "persisted telemetry setup answers as secrets"
430                        );
431                    }
432                    Err(e) => {
433                        tracing::warn!(
434                            pack_id = %binding.pack_id,
435                            error = %e,
436                            "failed to persist telemetry setup answers"
437                        );
438                    }
439                    _ => {}
440                }
441            }
442        }
443    }
444
445    // 3. Mark the telemetry capability as ready before invoking.
446    //    The telemetry bootstrap runs before the server starts, so there is
447    //    no prior QA wizard step that would create the install record.
448    //    Without this, invoke_capability returns capability_not_installed.
449    let ctx = OperatorContext {
450        tenant: tenant.to_string(),
451        team: team.map(|t| t.to_string()),
452        correlation_id: None,
453    };
454    if let Err(e) = runner_host.mark_capability_ready(&ctx, &binding) {
455        tracing::warn!(error = %e, "failed to mark telemetry capability as ready (non-fatal)");
456    }
457
458    // 4. Invoke the WASM component
459    let payload = serde_json::json!({});
460    let payload_bytes = serde_json::to_vec(&payload)?;
461
462    let outcome = runner_host.invoke_capability(
463        CAP_TELEMETRY_V1,
464        TELEMETRY_CONFIGURE_OP,
465        &payload_bytes,
466        &ctx,
467    )?;
468
469    if !outcome.success {
470        let error_msg = outcome.error.unwrap_or_else(|| "unknown error".to_string());
471        tracing::warn!(error = %error_msg, "telemetry.configure capability invocation failed");
472        return Ok(false);
473    }
474
475    // 5. Parse the telemetry provider output using a local compatibility shim.
476    let raw_output = match outcome.output {
477        Some(value) => value,
478        None => {
479            tracing::warn!("telemetry.configure returned no output");
480            return Ok(false);
481        }
482    };
483
484    tracing::debug!(config = %raw_output, "telemetry provider config received");
485
486    // The WASM component wraps output in {"ok":true,"output":{...}}
487    // Extract the inner "output" field if present.
488    let config_json = if let Some(inner) = raw_output.get("output") {
489        inner.clone()
490    } else {
491        raw_output
492    };
493
494    let config: LegacyTelemetryProviderConfig = serde_json::from_value(config_json)?;
495
496    // 6. Validate config
497    let warnings = validate_telemetry_config(&config);
498    for warning in &warnings {
499        tracing::warn!(warning = %warning, "telemetry config validation");
500    }
501
502    // 7. Initialize OTel pipeline
503    let (telemetry_config, export_config) = config.to_runtime_config()?;
504    greentic_telemetry::init_telemetry_from_config(telemetry_config, export_config)?;
505
506    tracing::info!(
507        export_mode = ?config.export_mode,
508        preset = ?config.preset,
509        endpoint = ?config.endpoint(),
510        sampling_ratio = config.sampling_ratio,
511        "telemetry upgraded from capability provider"
512    );
513
514    Ok(true)
515}
516
517// ---------------------------------------------------------------------------
518// State store capability upgrade (in-memory → Redis)
519// ---------------------------------------------------------------------------
520
521const CAP_STATE_KV_V1: &str = "greentic.cap.state.kv.v1";
522
523/// Try to upgrade the state store from in-memory to Redis by reading
524/// the `redis_url` secret from the state-redis capability pack.
525///
526/// When `setup_answers` is provided, the values are persisted to the dev
527/// secrets store first (same pattern as telemetry).
528///
529/// Returns `Ok(Some(store))` with a Redis-backed state store on success,
530/// `Ok(None)` if no state capability found or Redis URL unavailable.
531pub fn try_upgrade_state_store(
532    bundle: &Path,
533    runner_host: &DemoRunnerHost,
534    secrets_handle: &SecretsManagerHandle,
535    tenant: &str,
536    team: Option<&str>,
537    env_override: Option<&str>,
538    setup_answers: Option<&serde_json::Value>,
539) -> Result<Option<DynStateStore>> {
540    let env = resolve_env(env_override);
541    let scope = ResolveScope {
542        env: Some(env.clone()),
543        tenant: Some(tenant.to_string()),
544        team: team.map(|t| t.to_string()),
545    };
546
547    // 1. Resolve the state.kv capability
548    let Some(binding) = runner_host.resolve_capability(CAP_STATE_KV_V1, None, scope) else {
549        eprintln!(
550            "[state-store] no capability '{}' found — using in-memory",
551            CAP_STATE_KV_V1
552        );
553        return Ok(None);
554    };
555    eprintln!(
556        "[state-store] resolved capability: pack_id={} stable_id={}",
557        binding.pack_id, binding.stable_id
558    );
559
560    // 2. Seed secrets for the state pack
561    if let Ok(secrets_setup) = SecretsSetup::new(bundle, &env, tenant, team) {
562        if let Ok(rt) = tokio::runtime::Builder::new_current_thread()
563            .enable_all()
564            .build()
565        {
566            if let Err(e) =
567                rt.block_on(secrets_setup.ensure_pack_secrets(&binding.pack_path, &binding.pack_id))
568            {
569                tracing::warn!(
570                    pack_id = %binding.pack_id,
571                    error = %e,
572                    "state capability secret seeding failed"
573                );
574            }
575        }
576    }
577
578    // 2b. Persist setup_answers as secrets (same as telemetry — capability packs
579    //     are not in any domain, so the wizard doesn't persist their config).
580    if let Some(answers) = setup_answers {
581        if answers.as_object().is_some_and(|m| !m.is_empty()) {
582            let pack_path_ref = Some(binding.pack_path.as_path());
583            let persist_rt = tokio::runtime::Builder::new_current_thread()
584                .enable_all()
585                .build();
586            if let Ok(rt) = persist_rt {
587                match rt.block_on(crate::qa_persist::persist_all_config_as_secrets(
588                    bundle,
589                    &env,
590                    tenant,
591                    team,
592                    &binding.pack_id,
593                    answers,
594                    pack_path_ref,
595                )) {
596                    Ok(saved) if !saved.is_empty() => {
597                        tracing::info!(
598                            pack_id = %binding.pack_id,
599                            count = saved.len(),
600                            keys = ?saved,
601                            "persisted state-redis setup answers as secrets"
602                        );
603                    }
604                    Err(e) => {
605                        tracing::warn!(
606                            pack_id = %binding.pack_id,
607                            error = %e,
608                            "failed to persist state-redis setup answers"
609                        );
610                    }
611                    _ => {}
612                }
613            }
614        }
615    }
616
617    // 3. Read the redis_url secret
618    let canonical_team = crate::secrets_manager::canonical_team(team);
619    let secret_uri = format!(
620        "secrets://{}/{}/{}/{}/redis_url",
621        env, tenant, canonical_team, binding.pack_id
622    );
623
624    eprintln!("[state-store] reading secret: {}", secret_uri);
625    let redis_url = {
626        let rt = tokio::runtime::Builder::new_current_thread()
627            .enable_all()
628            .build()?;
629        let manager = secrets_handle.manager();
630        match rt.block_on(manager.read(&secret_uri)) {
631            Ok(bytes) => {
632                let url = String::from_utf8(bytes).ok();
633                eprintln!(
634                    "[state-store] redis_url secret found (len={})",
635                    url.as_ref().map_or(0, |s| s.len())
636                );
637                url
638            }
639            Err(e) => {
640                eprintln!("[state-store] failed to read redis_url secret: {e}");
641                None
642            }
643        }
644    };
645
646    let Some(redis_url) = redis_url else {
647        // Fallback: try REDIS_URL environment variable
648        match std::env::var("REDIS_URL") {
649            Ok(url) => {
650                tracing::info!("using REDIS_URL environment variable for state store");
651                return create_redis_store(&url);
652            }
653            Err(_) => {
654                tracing::warn!(
655                    "redis_url secret not found and REDIS_URL env not set — using in-memory state store"
656                );
657                return Ok(None);
658            }
659        }
660    };
661
662    create_redis_store(&redis_url)
663}
664
665fn create_redis_store(redis_url: &str) -> Result<Option<DynStateStore>> {
666    match RedisStateStore::from_url(redis_url) {
667        Ok(store) => {
668            let store: DynStateStore = Arc::new(store);
669            eprintln!("[state-store] ✓ upgraded to Redis: {}", redis_url);
670            tracing::info!(
671                redis_url = %redis_url,
672                "state store upgraded to Redis"
673            );
674            Ok(Some(store))
675        }
676        Err(e) => {
677            eprintln!("[state-store] ✗ failed to create Redis store: {e}");
678            tracing::warn!(
679                error = %e,
680                "failed to create Redis state store — using in-memory fallback"
681            );
682            Ok(None)
683        }
684    }
685}