Skip to main content

greentic_operator/demo/
runner_host.rs

1use std::collections::{BTreeMap, HashMap};
2use std::env;
3use std::fs;
4use std::io::Read;
5#[cfg(unix)]
6use std::os::unix::fs::PermissionsExt;
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9
10use anyhow::{Context, anyhow};
11use base64::{Engine as _, engine::general_purpose};
12use greentic_runner_desktop::RunStatus;
13use greentic_runner_host::{
14    RunnerWasiPolicy,
15    component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx},
16    config::{
17        FlowRetryConfig, HostConfig, OperatorPolicy, RateLimits, SecretsPolicy, StateStorePolicy,
18        WebhookPolicy,
19    },
20    pack::{ComponentResolution, PackRuntime},
21    storage::{DynSessionStore, DynStateStore, new_state_store},
22    trace::TraceConfig,
23    validate::ValidationConfig,
24};
25use greentic_types::cbor::canonical;
26use greentic_types::decode_pack_manifest;
27use serde::{Deserialize, Serialize};
28use serde_json::{Value as JsonValue, json};
29use tokio::runtime::Runtime as TokioRuntime;
30use zip::ZipArchive;
31
32/// Create a Tokio runtime for blocking async operations.
33/// When called from within an existing runtime (e.g., HTTP ingress handler),
34/// spawns a dedicated thread to avoid "Cannot start a runtime from within a
35/// runtime" panics.
36fn make_runtime_or_thread_scope<F, T>(f: F) -> T
37where
38    F: FnOnce(&TokioRuntime) -> T + Send,
39    T: Send,
40{
41    if tokio::runtime::Handle::try_current().is_ok() {
42        std::thread::scope(|s| {
43            s.spawn(|| {
44                let rt = TokioRuntime::new().expect("failed to create tokio runtime");
45                f(&rt)
46            })
47            .join()
48            .expect("provider invocation thread panicked")
49        })
50    } else {
51        let rt = TokioRuntime::new().expect("failed to create tokio runtime");
52        f(&rt)
53    }
54}
55
56use crate::runner_exec;
57use crate::runner_integration;
58use crate::runner_integration::RunFlowOptions;
59use crate::runner_integration::RunnerFlavor;
60use crate::runner_integration::run_flow_with_options;
61
62use crate::capabilities::{
63    CAP_OAUTH_BROKER_V1, CAP_OAUTH_TOKEN_VALIDATION_V1, CapabilityBinding, CapabilityInstallRecord,
64    CapabilityPackRecord, CapabilityRegistry, HookStage, OAUTH_OP_AWAIT_RESULT,
65    OAUTH_OP_GET_ACCESS_TOKEN, OAUTH_OP_INITIATE_AUTH, OAUTH_OP_REQUEST_RESOURCE_TOKEN,
66    ResolveScope, is_binding_ready, is_oauth_broker_operation, write_install_record,
67};
68use crate::cards::CardRenderer;
69use crate::discovery;
70use crate::domains::{self, Domain, ProviderPack};
71use crate::operator_log;
72use crate::secrets_gate::{self, DynSecretsManager, SecretsManagerHandle};
73use crate::secrets_manager;
74use crate::state_layout;
75
76#[derive(Clone)]
77pub struct OperatorContext {
78    pub tenant: String,
79    pub team: Option<String>,
80    pub correlation_id: Option<String>,
81}
82
83#[derive(Clone, Copy, Debug, PartialEq, Eq)]
84pub enum RunnerExecutionMode {
85    Exec,
86    Integration,
87}
88
89#[derive(Clone)]
90pub struct FlowOutcome {
91    pub success: bool,
92    pub output: Option<JsonValue>,
93    pub raw: Option<String>,
94    pub error: Option<String>,
95    pub mode: RunnerExecutionMode,
96}
97
98#[derive(Clone, Debug, Serialize, Deserialize)]
99#[serde(rename_all = "snake_case")]
100enum OperationStatus {
101    Pending,
102    Denied,
103    Ok,
104    Err,
105}
106
107#[derive(Clone, Debug, Serialize, Deserialize)]
108struct OperationEnvelopeContext {
109    tenant: String,
110    team: Option<String>,
111    correlation_id: Option<String>,
112    #[serde(default, skip_serializing_if = "Option::is_none")]
113    auth_claims: Option<JsonValue>,
114}
115
116#[derive(Clone, Debug, Serialize, Deserialize)]
117struct OperationEnvelope {
118    op_id: String,
119    op_name: String,
120    ctx: OperationEnvelopeContext,
121    payload_cbor: Vec<u8>,
122    meta_cbor: Option<Vec<u8>>,
123    status: OperationStatus,
124    result_cbor: Option<Vec<u8>>,
125}
126
127impl OperationEnvelope {
128    fn new(op_name: &str, payload: &[u8], ctx: &OperatorContext) -> Self {
129        Self {
130            op_id: uuid::Uuid::new_v4().to_string(),
131            op_name: op_name.to_string(),
132            ctx: OperationEnvelopeContext {
133                tenant: ctx.tenant.clone(),
134                team: ctx.team.clone(),
135                correlation_id: ctx.correlation_id.clone(),
136                auth_claims: None,
137            },
138            payload_cbor: payload.to_vec(),
139            meta_cbor: None,
140            status: OperationStatus::Pending,
141            result_cbor: None,
142        }
143    }
144}
145
146#[derive(Debug, Serialize)]
147struct HookEvalRequest {
148    stage: String,
149    op_name: String,
150    envelope: OperationEnvelope,
151}
152
153#[derive(Debug, Deserialize)]
154struct HookEvalResponse {
155    decision: String,
156    #[serde(default)]
157    reason: Option<String>,
158    #[serde(default)]
159    envelope: Option<OperationEnvelope>,
160}
161
162#[derive(Debug)]
163enum HookChainOutcome {
164    Continue,
165    Denied(String),
166}
167
168#[derive(Clone, Debug)]
169enum RunnerMode {
170    Exec,
171    Integration {
172        binary: PathBuf,
173        flavor: RunnerFlavor,
174    },
175}
176
177#[derive(Clone)]
178pub struct DemoRunnerHost {
179    bundle_root: PathBuf,
180    runner_mode: RunnerMode,
181    catalog: HashMap<(Domain, String), ProviderPack>,
182    packs_by_path: BTreeMap<PathBuf, ProviderPack>,
183    capability_registry: CapabilityRegistry,
184    secrets_handle: SecretsManagerHandle,
185    card_renderer: CardRenderer,
186    state_store: DynStateStore,
187    debug_enabled: bool,
188}
189
190impl DemoRunnerHost {
191    pub fn bundle_root(&self) -> &Path {
192        &self.bundle_root
193    }
194
195    pub fn secrets_manager(&self) -> DynSecretsManager {
196        self.secrets_handle.manager()
197    }
198
199    pub fn secrets_handle(&self) -> &SecretsManagerHandle {
200        &self.secrets_handle
201    }
202
203    pub fn state_store(&self) -> DynStateStore {
204        self.state_store.clone()
205    }
206
207    /// Replace the state store (e.g. upgrade from in-memory to Redis).
208    pub fn set_state_store(&mut self, store: DynStateStore) {
209        self.state_store = store;
210    }
211
212    pub fn new(
213        bundle_root: PathBuf,
214        discovery: &discovery::DiscoveryResult,
215        runner_binary: Option<PathBuf>,
216        secrets_handle: SecretsManagerHandle,
217        debug_enabled: bool,
218    ) -> anyhow::Result<Self> {
219        let runner_binary = runner_binary.and_then(validate_runner_binary);
220        let mode = if let Some(ref binary) = runner_binary {
221            let flavor = runner_integration::detect_runner_flavor(binary);
222            RunnerMode::Integration {
223                binary: binary.clone(),
224                flavor,
225            }
226        } else {
227            RunnerMode::Exec
228        };
229        let mut catalog = HashMap::new();
230        let mut packs_by_path = BTreeMap::new();
231        let mut pack_index: BTreeMap<PathBuf, CapabilityPackRecord> = BTreeMap::new();
232        let provider_map = discovery
233            .providers
234            .iter()
235            .map(|provider| (provider.pack_path.clone(), provider.provider_id.clone()))
236            .collect::<HashMap<_, _>>();
237        for domain in [
238            Domain::Messaging,
239            Domain::Events,
240            Domain::Secrets,
241            Domain::OAuth,
242        ] {
243            let is_demo_bundle = bundle_root.join("greentic.demo.yaml").exists();
244            let packs = if is_demo_bundle {
245                domains::discover_provider_packs_cbor_only(&bundle_root, domain)?
246            } else {
247                domains::discover_provider_packs(&bundle_root, domain)?
248            };
249            for pack in packs {
250                packs_by_path.insert(pack.path.clone(), pack.clone());
251                pack_index.insert(
252                    pack.path.clone(),
253                    CapabilityPackRecord {
254                        pack_id: pack.pack_id.clone(),
255                        domain,
256                    },
257                );
258                let provider_type = provider_map
259                    .get(&pack.path)
260                    .cloned()
261                    .unwrap_or_else(|| pack.pack_id.clone());
262                catalog.insert((domain, provider_type.clone()), pack.clone());
263                if provider_type != pack.pack_id {
264                    catalog.insert((domain, pack.pack_id.clone()), pack.clone());
265                }
266            }
267        }
268        let capability_registry = CapabilityRegistry::build_from_pack_index(&pack_index)?;
269        Ok(Self {
270            bundle_root,
271            runner_mode: mode,
272            catalog,
273            packs_by_path,
274            capability_registry,
275            secrets_handle,
276            card_renderer: CardRenderer::new(),
277            state_store: new_state_store(),
278            debug_enabled,
279        })
280    }
281
282    pub fn debug_enabled(&self) -> bool {
283        self.debug_enabled
284    }
285
286    /// Return the canonical `provider_type` stored inside a provider pack manifest
287    /// (e.g. `"messaging.webex.bot"`).  Falls back to the lookup key when the pack
288    /// is not found or the manifest cannot be read.
289    pub fn canonical_provider_type(&self, domain: Domain, lookup_key: &str) -> String {
290        if let Some(pack) = self.catalog.get(&(domain, lookup_key.to_string())) {
291            primary_provider_type(&pack.path).unwrap_or_else(|_| lookup_key.to_string())
292        } else {
293            lookup_key.to_string()
294        }
295    }
296
297    pub fn resolve_capability(
298        &self,
299        cap_id: &str,
300        min_version: Option<&str>,
301        scope: ResolveScope,
302    ) -> Option<CapabilityBinding> {
303        self.capability_registry
304            .resolve(cap_id, min_version, &scope)
305    }
306
307    pub fn resolve_hook_chain(&self, stage: HookStage, op_name: &str) -> Vec<CapabilityBinding> {
308        self.capability_registry.resolve_hook_chain(stage, op_name)
309    }
310
311    pub fn has_provider_packs_for_domain(&self, domain: Domain) -> bool {
312        self.catalog
313            .keys()
314            .any(|(entry_domain, _)| *entry_domain == domain)
315    }
316
317    pub fn capability_setup_plan(&self, ctx: &OperatorContext) -> Vec<CapabilityBinding> {
318        let scope = ResolveScope {
319            env: env::var("GREENTIC_ENV").ok(),
320            tenant: Some(ctx.tenant.clone()),
321            team: ctx.team.clone(),
322        };
323        self.capability_registry
324            .offers_requiring_setup(&scope)
325            .into_iter()
326            .map(|offer| CapabilityBinding {
327                cap_id: offer.cap_id,
328                stable_id: offer.stable_id,
329                pack_id: offer.pack_id,
330                domain: offer.domain,
331                pack_path: offer.pack_path,
332                provider_component_ref: offer.provider_component_ref,
333                provider_op: offer.provider_op,
334                version: offer.version,
335                requires_setup: offer.requires_setup,
336                setup_qa_ref: offer.setup_qa_ref,
337            })
338            .collect()
339    }
340
341    pub fn mark_capability_ready(
342        &self,
343        ctx: &OperatorContext,
344        binding: &CapabilityBinding,
345    ) -> anyhow::Result<PathBuf> {
346        let record =
347            CapabilityInstallRecord::ready(&binding.cap_id, &binding.stable_id, &binding.pack_id);
348        write_install_record(&self.bundle_root, &ctx.tenant, ctx.team.as_deref(), &record)
349    }
350
351    pub fn mark_capability_failed(
352        &self,
353        ctx: &OperatorContext,
354        binding: &CapabilityBinding,
355        failure_key: &str,
356    ) -> anyhow::Result<PathBuf> {
357        let record = CapabilityInstallRecord::failed(
358            &binding.cap_id,
359            &binding.stable_id,
360            &binding.pack_id,
361            failure_key,
362        );
363        write_install_record(&self.bundle_root, &ctx.tenant, ctx.team.as_deref(), &record)
364    }
365
366    pub fn invoke_capability(
367        &self,
368        cap_id: &str,
369        op: &str,
370        payload_bytes: &[u8],
371        ctx: &OperatorContext,
372    ) -> anyhow::Result<FlowOutcome> {
373        let _span = tracing::info_span!(
374            "invoke_capability",
375            cap_id = %cap_id,
376            op = %op,
377            tenant = %ctx.tenant,
378        )
379        .entered();
380        let requested_op = op.trim();
381        if cap_id == CAP_OAUTH_BROKER_V1 {
382            if requested_op.is_empty() {
383                return Ok(capability_route_error_outcome(
384                    cap_id,
385                    "<missing-op>",
386                    format!(
387                        "oauth broker capability requires an explicit op (supported: {}, {}, {}, {})",
388                        OAUTH_OP_INITIATE_AUTH,
389                        OAUTH_OP_AWAIT_RESULT,
390                        OAUTH_OP_GET_ACCESS_TOKEN,
391                        OAUTH_OP_REQUEST_RESOURCE_TOKEN
392                    ),
393                ));
394            }
395            if !is_oauth_broker_operation(requested_op) {
396                return Ok(capability_route_error_outcome(
397                    cap_id,
398                    requested_op,
399                    format!(
400                        "unsupported oauth broker op `{requested_op}` (supported: {}, {}, {}, {})",
401                        OAUTH_OP_INITIATE_AUTH,
402                        OAUTH_OP_AWAIT_RESULT,
403                        OAUTH_OP_GET_ACCESS_TOKEN,
404                        OAUTH_OP_REQUEST_RESOURCE_TOKEN
405                    ),
406                ));
407            }
408        }
409        let scope = ResolveScope {
410            env: env::var("GREENTIC_ENV").ok(),
411            tenant: Some(ctx.tenant.clone()),
412            team: ctx.team.clone(),
413        };
414        let binding = if requested_op.is_empty() {
415            self.resolve_capability(cap_id, None, scope)
416        } else {
417            self.capability_registry
418                .resolve_for_op(cap_id, None, &scope, Some(requested_op))
419        };
420        let Some(binding) = binding else {
421            return Ok(missing_capability_outcome(cap_id, op, None));
422        };
423        if !is_binding_ready(
424            &self.bundle_root,
425            &ctx.tenant,
426            ctx.team.as_deref(),
427            &binding,
428        )? {
429            return Ok(capability_not_installed_outcome(
430                cap_id,
431                op,
432                &binding.stable_id,
433            ));
434        }
435
436        let Some(pack) = self.packs_by_path.get(&binding.pack_path) else {
437            return Ok(capability_route_error_outcome(
438                cap_id,
439                op,
440                format!("resolved pack not found at {}", binding.pack_path.display()),
441            ));
442        };
443
444        let target_op = if cap_id == CAP_OAUTH_BROKER_V1 || requested_op.is_empty() {
445            // OAuth broker cap.invoke always routes through the selected provider op.
446            binding.provider_op.as_str()
447        } else {
448            requested_op
449        };
450
451        // Capability invocations go through the same operator pipeline.
452        let mut envelope =
453            OperationEnvelope::new(&format!("cap.invoke:{cap_id}"), payload_bytes, ctx);
454        let token_validation_outcome =
455            self.evaluate_token_validation_pre_hook(&mut envelope, payload_bytes, ctx)?;
456        if let HookChainOutcome::Denied(reason) = token_validation_outcome {
457            envelope.status = OperationStatus::Denied;
458            self.emit_post_sub(&envelope);
459            return Ok(capability_route_error_outcome(
460                cap_id,
461                target_op,
462                format!("operation denied by pre-hook: {reason}"),
463            ));
464        }
465        let pre_chain = self.resolve_hook_chain(HookStage::Pre, &envelope.op_name);
466        let pre_hook_outcome =
467            self.evaluate_hook_chain(&pre_chain, HookStage::Pre, &mut envelope)?;
468        self.emit_pre_sub(&envelope);
469        if let HookChainOutcome::Denied(reason) = pre_hook_outcome {
470            envelope.status = OperationStatus::Denied;
471            self.emit_post_sub(&envelope);
472            return Ok(capability_route_error_outcome(
473                cap_id,
474                target_op,
475                format!("operation denied by pre-hook: {reason}"),
476            ));
477        }
478
479        let outcome = self.invoke_provider_component_op(
480            binding.domain,
481            pack,
482            &binding.pack_id,
483            target_op,
484            payload_bytes,
485            ctx,
486        )?;
487
488        envelope.status = if outcome.success {
489            OperationStatus::Ok
490        } else {
491            OperationStatus::Err
492        };
493        envelope.result_cbor = outcome.output.as_ref().and_then(json_to_canonical_cbor);
494        let post_chain = self.resolve_hook_chain(HookStage::Post, &envelope.op_name);
495        let _ = self.evaluate_hook_chain(&post_chain, HookStage::Post, &mut envelope)?;
496        self.emit_post_sub(&envelope);
497        Ok(outcome)
498    }
499
500    pub fn supports_op(&self, domain: Domain, provider_type: &str, op_id: &str) -> bool {
501        self.catalog
502            .get(&(domain, provider_type.to_string()))
503            .map(|pack| {
504                pack.entry_flows.iter().any(|flow| flow == op_id)
505                    || pack_supports_provider_op(&pack.path, op_id).unwrap_or(false)
506            })
507            .unwrap_or(false)
508    }
509
510    pub fn invoke_provider_op(
511        &self,
512        domain: Domain,
513        provider_type: &str,
514        op_id: &str,
515        payload_bytes: &[u8],
516        ctx: &OperatorContext,
517    ) -> anyhow::Result<FlowOutcome> {
518        let _span = tracing::info_span!(
519            "invoke_provider_op",
520            provider = %provider_type,
521            op = %op_id,
522            tenant = %ctx.tenant,
523        )
524        .entered();
525        let mut envelope = OperationEnvelope::new(op_id, payload_bytes, ctx);
526        let token_validation_outcome =
527            self.evaluate_token_validation_pre_hook(&mut envelope, payload_bytes, ctx)?;
528        if let HookChainOutcome::Denied(reason) = token_validation_outcome {
529            envelope.status = OperationStatus::Denied;
530            self.emit_pre_sub(&envelope);
531            self.emit_post_sub(&envelope);
532            return Ok(FlowOutcome {
533                success: false,
534                output: None,
535                raw: None,
536                error: Some(format!("operation denied by pre-hook: {reason}")),
537                mode: RunnerExecutionMode::Exec,
538            });
539        }
540        let pre_chain = self.resolve_hook_chain(HookStage::Pre, op_id);
541        let pre_hook_outcome =
542            self.evaluate_hook_chain(&pre_chain, HookStage::Pre, &mut envelope)?;
543        self.emit_pre_sub(&envelope);
544        if let HookChainOutcome::Denied(reason) = pre_hook_outcome {
545            envelope.status = OperationStatus::Denied;
546            self.emit_post_sub(&envelope);
547            return Ok(FlowOutcome {
548                success: false,
549                output: Some(serde_json::to_value(&envelope).unwrap_or_else(|_| json!({}))),
550                raw: None,
551                error: Some(format!("operation denied by pre-hook: {reason}")),
552                mode: RunnerExecutionMode::Exec,
553            });
554        }
555
556        let outcome =
557            self.invoke_provider_op_inner(domain, provider_type, op_id, payload_bytes, ctx)?;
558        envelope.status = if outcome.success {
559            OperationStatus::Ok
560        } else {
561            OperationStatus::Err
562        };
563        envelope.result_cbor = outcome.output.as_ref().and_then(json_to_canonical_cbor);
564
565        let post_chain = self.resolve_hook_chain(HookStage::Post, op_id);
566        let _ = self.evaluate_hook_chain(&post_chain, HookStage::Post, &mut envelope)?;
567        self.emit_post_sub(&envelope);
568        Ok(outcome)
569    }
570
571    fn invoke_provider_op_inner(
572        &self,
573        domain: Domain,
574        provider_type: &str,
575        op_id: &str,
576        payload_bytes: &[u8],
577        ctx: &OperatorContext,
578    ) -> anyhow::Result<FlowOutcome> {
579        let pack = self
580            .catalog
581            .get(&(domain, provider_type.to_string()))
582            .ok_or_else(|| {
583                anyhow::anyhow!(
584                    "provider {} not found for domain {}",
585                    provider_type,
586                    domains::domain_name(domain)
587                )
588            })?;
589
590        if pack.entry_flows.iter().any(|flow| flow == op_id) {
591            let flow_id = op_id;
592            if self.debug_enabled {
593                operator_log::debug(
594                    module_path!(),
595                    format!(
596                        "[demo dev] invoking provider domain={} provider={} flow={} tenant={} team={} payload_len={} preview={}",
597                        domains::domain_name(domain),
598                        provider_type,
599                        flow_id,
600                        ctx.tenant,
601                        ctx.team.as_deref().unwrap_or("default"),
602                        payload_bytes.len(),
603                        payload_preview(payload_bytes),
604                    ),
605                );
606            }
607            let run_dir = state_layout::run_dir(&self.bundle_root, domain, &pack.pack_id, flow_id)?;
608            std::fs::create_dir_all(&run_dir)?;
609
610            let render_outcome = self.card_renderer.render_if_needed(
611                provider_type,
612                payload_bytes,
613                |cap_id, op, input| {
614                    let outcome = self.invoke_capability(cap_id, op, input, ctx)?;
615                    if !outcome.success {
616                        let reason = outcome
617                            .error
618                            .clone()
619                            .or(outcome.raw.clone())
620                            .unwrap_or_else(|| "capability invocation failed".to_string());
621                        return Err(anyhow!(
622                            "card capability {}:{} failed: {}",
623                            cap_id,
624                            op,
625                            reason
626                        ));
627                    }
628                    outcome.output.ok_or_else(|| {
629                        anyhow!(
630                            "card capability {}:{} returned no structured output",
631                            cap_id,
632                            op
633                        )
634                    })
635                },
636            )?;
637            let payload = serde_json::from_slice(&render_outcome.bytes).unwrap_or_else(|_| {
638                json!({
639                    "payload": general_purpose::STANDARD.encode(&render_outcome.bytes)
640                })
641            });
642
643            let outcome = match &self.runner_mode {
644                RunnerMode::Exec => {
645                    self.execute_with_runner_exec(domain, pack, flow_id, &payload, ctx, &run_dir)?
646                }
647                RunnerMode::Integration { binary, flavor } => self
648                    .execute_with_runner_integration(
649                        domain, pack, flow_id, &payload, ctx, &run_dir, binary, *flavor,
650                    )?,
651            };
652
653            if self.debug_enabled {
654                operator_log::debug(
655                    module_path!(),
656                    format!(
657                        "[demo dev] provider={} flow={} tenant={} team={} success={} mode={:?} error={:?} corr_id={}",
658                        provider_type,
659                        flow_id,
660                        ctx.tenant,
661                        ctx.team.as_deref().unwrap_or("default"),
662                        outcome.success,
663                        outcome.mode,
664                        outcome.error,
665                        ctx.correlation_id.as_deref().unwrap_or("none"),
666                    ),
667                );
668            }
669            operator_log::info(
670                module_path!(),
671                format!(
672                    "invoke domain={} provider={} op={} mode={:?} corr={}",
673                    domains::domain_name(domain),
674                    provider_type,
675                    flow_id,
676                    outcome.mode,
677                    ctx.correlation_id.as_deref().unwrap_or("none")
678                ),
679            );
680
681            return Ok(outcome);
682        }
683
684        self.invoke_provider_component_op(domain, pack, provider_type, op_id, payload_bytes, ctx)
685    }
686
687    fn evaluate_hook_chain(
688        &self,
689        chain: &[CapabilityBinding],
690        stage: HookStage,
691        envelope: &mut OperationEnvelope,
692    ) -> anyhow::Result<HookChainOutcome> {
693        for binding in chain {
694            let Some(pack) = self.packs_by_path.get(&binding.pack_path) else {
695                operator_log::warn(
696                    module_path!(),
697                    format!(
698                        "hook binding skipped; pack not found stable_id={} path={}",
699                        binding.stable_id,
700                        binding.pack_path.display()
701                    ),
702                );
703                continue;
704            };
705
706            let payload = canonical::to_canonical_cbor(&HookEvalRequest {
707                stage: match stage {
708                    HookStage::Pre => "pre",
709                    HookStage::Post => "post",
710                }
711                .to_string(),
712                op_name: envelope.op_name.clone(),
713                envelope: envelope.clone(),
714            })
715            .map_err(|err| anyhow!("failed to encode hook request as cbor: {err}"))?;
716            let ctx = OperatorContext {
717                tenant: envelope.ctx.tenant.clone(),
718                team: envelope.ctx.team.clone(),
719                correlation_id: envelope.ctx.correlation_id.clone(),
720            };
721            let outcome = self.invoke_provider_component_op(
722                binding.domain,
723                pack,
724                &binding.pack_id,
725                &binding.provider_op,
726                &payload,
727                &ctx,
728            )?;
729            if !outcome.success {
730                operator_log::warn(
731                    module_path!(),
732                    format!(
733                        "hook invocation failed stage={:?} binding={} err={}",
734                        stage,
735                        binding.stable_id,
736                        outcome.error.unwrap_or_else(|| "unknown error".to_string())
737                    ),
738                );
739                continue;
740            }
741            let Some(output) = outcome.output else {
742                continue;
743            };
744            let parsed: HookEvalResponse = match decode_hook_response(&output) {
745                Ok(value) => value,
746                Err(err) => {
747                    operator_log::warn(
748                        module_path!(),
749                        format!(
750                            "hook response decode failed stage={:?} binding={} err={} (expected cbor, with legacy json fallback)",
751                            stage, binding.stable_id, err
752                        ),
753                    );
754                    continue;
755                }
756            };
757            if let Some(updated) = parsed.envelope {
758                *envelope = updated;
759            }
760            if parsed.decision.eq_ignore_ascii_case("deny") && matches!(stage, HookStage::Pre) {
761                let reason = parsed
762                    .reason
763                    .unwrap_or_else(|| "hook denied operation".to_string());
764                return Ok(HookChainOutcome::Denied(reason));
765            }
766        }
767        Ok(HookChainOutcome::Continue)
768    }
769
770    fn evaluate_token_validation_pre_hook(
771        &self,
772        envelope: &mut OperationEnvelope,
773        payload_bytes: &[u8],
774        ctx: &OperatorContext,
775    ) -> anyhow::Result<HookChainOutcome> {
776        if envelope
777            .op_name
778            .starts_with(&format!("cap.invoke:{CAP_OAUTH_TOKEN_VALIDATION_V1}"))
779        {
780            return Ok(HookChainOutcome::Continue);
781        }
782        let Some(validation_request) = extract_token_validation_request(payload_bytes) else {
783            return Ok(HookChainOutcome::Continue);
784        };
785        let scope = ResolveScope {
786            env: env::var("GREENTIC_ENV").ok(),
787            tenant: Some(ctx.tenant.clone()),
788            team: ctx.team.clone(),
789        };
790        let Some(binding) = self.resolve_capability(CAP_OAUTH_TOKEN_VALIDATION_V1, None, scope)
791        else {
792            return Ok(HookChainOutcome::Continue);
793        };
794        if !is_binding_ready(
795            &self.bundle_root,
796            &ctx.tenant,
797            ctx.team.as_deref(),
798            &binding,
799        )? {
800            return Ok(HookChainOutcome::Denied(format!(
801                "token validation capability is not installed (stable_id={})",
802                binding.stable_id
803            )));
804        }
805        let Some(pack) = self.packs_by_path.get(&binding.pack_path) else {
806            return Ok(HookChainOutcome::Denied(format!(
807                "token validation pack not found at {}",
808                binding.pack_path.display()
809            )));
810        };
811        let request_bytes = serde_json::to_vec(&validation_request)
812            .map_err(|err| anyhow!("failed to encode token validation payload: {err}"))?;
813        let outcome = self.invoke_provider_component_op(
814            binding.domain,
815            pack,
816            &binding.pack_id,
817            &binding.provider_op,
818            &request_bytes,
819            ctx,
820        )?;
821        if !outcome.success {
822            let reason = outcome
823                .error
824                .unwrap_or_else(|| "token validation capability invocation failed".to_string());
825            return Ok(HookChainOutcome::Denied(reason));
826        }
827        let Some(output) = outcome.output else {
828            return Ok(HookChainOutcome::Denied(
829                "token validation returned no output".to_string(),
830            ));
831        };
832        match evaluate_token_validation_output(&output) {
833            TokenValidationDecision::Allow(claims) => {
834                envelope.ctx.auth_claims = claims;
835                Ok(HookChainOutcome::Continue)
836            }
837            TokenValidationDecision::Deny(reason) => Ok(HookChainOutcome::Denied(reason)),
838        }
839    }
840
841    fn emit_pre_sub(&self, envelope: &OperationEnvelope) {
842        operator_log::info(
843            module_path!(),
844            format!(
845                "sub.pre op={} status={:?} tenant={} team={}",
846                envelope.op_name,
847                envelope.status,
848                envelope.ctx.tenant,
849                envelope.ctx.team.as_deref().unwrap_or("default")
850            ),
851        );
852    }
853
854    fn emit_post_sub(&self, envelope: &OperationEnvelope) {
855        operator_log::info(
856            module_path!(),
857            format!(
858                "sub.post op={} status={:?} tenant={} team={}",
859                envelope.op_name,
860                envelope.status,
861                envelope.ctx.tenant,
862                envelope.ctx.team.as_deref().unwrap_or("default")
863            ),
864        );
865    }
866
867    fn execute_with_runner_exec(
868        &self,
869        domain: Domain,
870        pack: &ProviderPack,
871        flow_id: &str,
872        payload: &JsonValue,
873        ctx: &OperatorContext,
874        _run_dir: &Path,
875    ) -> anyhow::Result<FlowOutcome> {
876        let request = runner_exec::RunRequest {
877            root: self.bundle_root.clone(),
878            domain,
879            pack_path: pack.path.clone(),
880            pack_label: pack.pack_id.clone(),
881            flow_id: flow_id.to_string(),
882            tenant: ctx.tenant.clone(),
883            team: ctx.team.clone(),
884            input: payload.clone(),
885            dist_offline: true,
886        };
887        let run_output = runner_exec::run_provider_pack_flow(request)?;
888        let parsed = read_transcript_outputs(&run_output.run_dir)?;
889        Ok(FlowOutcome {
890            success: run_output.result.status == RunStatus::Success,
891            output: parsed,
892            raw: None,
893            error: run_output.result.error.clone(),
894            mode: RunnerExecutionMode::Exec,
895        })
896    }
897
898    #[allow(clippy::too_many_arguments)]
899    fn execute_with_runner_integration(
900        &self,
901        _domain: Domain,
902        pack: &ProviderPack,
903        flow_id: &str,
904        payload: &JsonValue,
905        ctx: &OperatorContext,
906        run_dir: &Path,
907        runner_binary: &Path,
908        flavor: RunnerFlavor,
909    ) -> anyhow::Result<FlowOutcome> {
910        let output = run_flow_with_options(
911            runner_binary,
912            &pack.path,
913            flow_id,
914            payload,
915            RunFlowOptions {
916                dist_offline: true,
917                tenant: Some(&ctx.tenant),
918                team: ctx.team.as_deref(),
919                artifacts_dir: Some(run_dir),
920                runner_flavor: flavor,
921            },
922        )?;
923        let mut parsed = output.parsed.clone();
924        if parsed.is_none() {
925            parsed = read_transcript_outputs(run_dir)?;
926        }
927        let raw = if output.stdout.trim().is_empty() {
928            None
929        } else {
930            Some(output.stdout.clone())
931        };
932        Ok(FlowOutcome {
933            success: output.status.success(),
934            output: parsed,
935            raw,
936            error: if output.status.success() {
937                None
938            } else {
939                Some(output.stderr.clone())
940            },
941            mode: RunnerExecutionMode::Integration,
942        })
943    }
944
945    pub fn invoke_provider_component_op_direct(
946        &self,
947        domain: Domain,
948        pack: &ProviderPack,
949        provider_id: &str,
950        op_id: &str,
951        payload_bytes: &[u8],
952        ctx: &OperatorContext,
953    ) -> anyhow::Result<FlowOutcome> {
954        self.invoke_provider_component_op(domain, pack, provider_id, op_id, payload_bytes, ctx)
955    }
956
957    fn invoke_provider_component_op(
958        &self,
959        domain: Domain,
960        pack: &ProviderPack,
961        provider_id: &str,
962        op_id: &str,
963        payload_bytes: &[u8],
964        ctx: &OperatorContext,
965    ) -> anyhow::Result<FlowOutcome> {
966        if let RunnerMode::Integration { binary, flavor } = &self.runner_mode {
967            let payload_value: JsonValue =
968                serde_json::from_slice(payload_bytes).unwrap_or_else(|_| json!({}));
969            let run_dir = state_layout::run_dir(&self.bundle_root, domain, &pack.pack_id, op_id)?;
970            std::fs::create_dir_all(&run_dir)?;
971            return self.execute_with_runner_integration(
972                domain,
973                pack,
974                op_id,
975                &payload_value,
976                ctx,
977                &run_dir,
978                binary,
979                *flavor,
980            );
981        }
982
983        let payload = payload_bytes.to_vec();
984        let result = make_runtime_or_thread_scope(|runtime| {
985            runtime.block_on(async {
986                let host_config = Arc::new(build_demo_host_config(&ctx.tenant));
987                // Re-open the dev store on each invocation so newly-written secrets
988                // (e.g. from QA wizard submit) are visible without restarting the demo.
989                let fresh_secrets = secrets_gate::resolve_secrets_manager(
990                    &self.bundle_root,
991                    &ctx.tenant,
992                    ctx.team.as_deref(),
993                )
994                .unwrap_or_else(|_| self.secrets_handle.clone());
995                let dev_store_display = fresh_secrets
996                    .dev_store_path
997                    .as_ref()
998                    .map(|path| path.display().to_string())
999                    .unwrap_or_else(|| "<default>".to_string());
1000                operator_log::info(
1001                    module_path!(),
1002                    format!(
1003                        "secrets backend for wasm: using_env_fallback={} dev_store={}",
1004                        fresh_secrets.using_env_fallback, dev_store_display,
1005                    ),
1006                );
1007                operator_log::info(
1008                    module_path!(),
1009                    format!(
1010                        "exec secrets: dev_store={} env_fallback={}",
1011                        dev_store_display, fresh_secrets.using_env_fallback,
1012                    ),
1013                );
1014                let pack_runtime = PackRuntime::load(
1015                    &pack.path,
1016                    host_config.clone(),
1017                    None,
1018                    Some(&pack.path),
1019                    None::<DynSessionStore>,
1020                    Some(self.state_store.clone()),
1021                    Arc::new(RunnerWasiPolicy::default()),
1022                    fresh_secrets.runtime_manager(Some(&pack.pack_id)),
1023                    None,
1024                    false,
1025                    ComponentResolution::default(),
1026                )
1027                .await?;
1028                let provider_type = primary_provider_type(&pack.path)
1029                    .context("failed to determine provider type for direct invocation")?;
1030                let _env_value = env::var("GREENTIC_ENV").unwrap_or_else(|_| "<unset>".to_string());
1031                let _canonical_team =
1032                    secrets_manager::canonical_team(ctx.team.as_deref()).into_owned();
1033                let _runner_dev_store_desc = self
1034                    .secrets_handle
1035                    .dev_store_path
1036                    .as_ref()
1037                    .map(|path| path.display().to_string())
1038                    .unwrap_or_else(|| "<none>".to_string());
1039                let binding = pack_runtime.resolve_provider(None, Some(&provider_type))?;
1040                let exec_ctx = ComponentExecCtx {
1041                    tenant: ComponentTenantCtx {
1042                        tenant: ctx.tenant.clone(),
1043                        team: ctx.team.clone(),
1044                        i18n_id: None,
1045                        user: None,
1046                        trace_id: None,
1047                        correlation_id: ctx.correlation_id.clone(),
1048                        deadline_unix_ms: None,
1049                        attempt: 1,
1050                        idempotency_key: None,
1051                    },
1052                    i18n_id: None,
1053                    flow_id: op_id.to_string(),
1054                    node_id: Some(op_id.to_string()),
1055                };
1056                pack_runtime
1057                    .invoke_provider(&binding, exec_ctx, op_id, payload)
1058                    .await
1059            })
1060        });
1061
1062        match result {
1063            Ok(value) => Ok(FlowOutcome {
1064                success: true,
1065                output: Some(value),
1066                raw: None,
1067                error: None,
1068                mode: RunnerExecutionMode::Exec,
1069            }),
1070            Err(err) => {
1071                let err_message = err.to_string();
1072                let needs_context = needs_secret_context(&err_message);
1073                let enriched_err = if needs_context {
1074                    err.context(secret_error_context(ctx, provider_id, op_id, pack))
1075                } else {
1076                    err
1077                };
1078                let error_text = if needs_context {
1079                    enriched_err.to_string()
1080                } else {
1081                    err_message
1082                };
1083                Ok(FlowOutcome {
1084                    success: false,
1085                    output: None,
1086                    raw: None,
1087                    error: Some(error_text),
1088                    mode: RunnerExecutionMode::Exec,
1089                })
1090            }
1091        }
1092    }
1093}
1094
1095pub fn primary_provider_type(pack_path: &Path) -> anyhow::Result<String> {
1096    // Try pack.manifest.json first (source of truth) before falling back to
1097    // manifest.cbor, which may contain a stale provider_type after pack builds.
1098    if let Ok(json_type) = primary_provider_type_from_json(pack_path) {
1099        return Ok(json_type);
1100    }
1101    let file = std::fs::File::open(pack_path)?;
1102    let mut archive = ZipArchive::new(file)?;
1103    let mut manifest_entry = archive.by_name("manifest.cbor").map_err(|err| {
1104        anyhow!(
1105            "failed to open manifest.cbor in {}: {err}",
1106            pack_path.display()
1107        )
1108    })?;
1109    let mut bytes = Vec::new();
1110    manifest_entry.read_to_end(&mut bytes)?;
1111    let manifest = decode_pack_manifest(&bytes)
1112        .context("failed to decode pack manifest for provider introspection")?;
1113    let inline = manifest.provider_extension_inline().ok_or_else(|| {
1114        anyhow!(
1115            "pack {} provider extension missing or not inline",
1116            pack_path.display()
1117        )
1118    })?;
1119    let provider = inline.providers.first().ok_or_else(|| {
1120        anyhow!(
1121            "pack {} provider extension contains no providers",
1122            pack_path.display()
1123        )
1124    })?;
1125    Ok(provider.provider_type.clone())
1126}
1127
1128/// Read provider_type from pack.manifest.json inside the archive.
1129fn primary_provider_type_from_json(pack_path: &Path) -> anyhow::Result<String> {
1130    let file = std::fs::File::open(pack_path)?;
1131    let mut archive = ZipArchive::new(file)?;
1132    let entry = archive
1133        .by_name("pack.manifest.json")
1134        .map_err(|_| anyhow!("pack.manifest.json not found in {}", pack_path.display()))?;
1135    let manifest: serde_json::Value = serde_json::from_reader(entry)?;
1136    let provider_type = manifest
1137        .pointer("/extensions/0/payload/providers/0/provider_type")
1138        .and_then(serde_json::Value::as_str)
1139        .ok_or_else(|| anyhow!("provider_type not found in pack.manifest.json"))?;
1140    Ok(provider_type.to_string())
1141}
1142
1143fn needs_secret_context(message: &str) -> bool {
1144    let lower = message.to_lowercase();
1145    lower.contains("secret store error") || message.contains("SecretsError")
1146}
1147
1148fn secret_error_context(
1149    ctx: &OperatorContext,
1150    provider_id: &str,
1151    op_id: &str,
1152    pack: &ProviderPack,
1153) -> String {
1154    let env = env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1155    let team = secrets_manager::canonical_team(ctx.team.as_deref()).into_owned();
1156    format!(
1157        "secret lookup context env={} tenant={} team={} provider={} flow={} pack_id={} pack_path={}",
1158        env,
1159        ctx.tenant,
1160        team,
1161        provider_id,
1162        op_id,
1163        pack.pack_id,
1164        pack.path.display()
1165    )
1166}
1167
1168fn json_to_canonical_cbor(value: &JsonValue) -> Option<Vec<u8>> {
1169    canonical::to_canonical_cbor_allow_floats(value).ok()
1170}
1171
1172fn decode_hook_response(value: &JsonValue) -> anyhow::Result<HookEvalResponse> {
1173    if let Some(cbor) = extract_cbor_blob(value)
1174        && let Ok(parsed) = serde_cbor::from_slice::<HookEvalResponse>(&cbor)
1175    {
1176        return Ok(parsed);
1177    }
1178    serde_json::from_value(value.clone())
1179        .map_err(|err| anyhow!("hook response is not valid cbor or legacy json: {err}"))
1180}
1181
1182fn extract_cbor_blob(value: &JsonValue) -> Option<Vec<u8>> {
1183    match value {
1184        JsonValue::Array(items) => items
1185            .iter()
1186            .map(|item| item.as_u64().and_then(|n| u8::try_from(n).ok()))
1187            .collect::<Option<Vec<u8>>>(),
1188        JsonValue::String(s) => general_purpose::STANDARD.decode(s).ok(),
1189        JsonValue::Object(map) => {
1190            for key in ["hook_decision_cbor_b64", "cbor_b64", "hook_decision_cbor"] {
1191                let Some(raw) = map.get(key) else {
1192                    continue;
1193                };
1194                if let JsonValue::String(s) = raw
1195                    && let Ok(bytes) = general_purpose::STANDARD.decode(s)
1196                {
1197                    return Some(bytes);
1198                }
1199                if let Some(bytes) = extract_cbor_blob(raw) {
1200                    return Some(bytes);
1201                }
1202            }
1203            None
1204        }
1205        _ => None,
1206    }
1207}
1208
1209fn missing_capability_outcome(
1210    cap_id: &str,
1211    op_name: &str,
1212    component_id: Option<&str>,
1213) -> FlowOutcome {
1214    FlowOutcome {
1215        success: false,
1216        output: Some(json!({
1217            "code": "missing_capability",
1218            "error": {
1219                "type": "MissingCapability",
1220                "cap_id": cap_id,
1221                "op_name": op_name,
1222                "component_id": component_id,
1223            }
1224        })),
1225        raw: None,
1226        error: Some(format!(
1227            "MissingCapability(cap_id={cap_id}, op_name={op_name}, component_id={})",
1228            component_id.unwrap_or("<unknown>")
1229        )),
1230        mode: RunnerExecutionMode::Exec,
1231    }
1232}
1233
1234fn capability_not_installed_outcome(cap_id: &str, op_name: &str, stable_id: &str) -> FlowOutcome {
1235    FlowOutcome {
1236        success: false,
1237        output: Some(json!({
1238            "code": "capability_not_installed",
1239            "error": {
1240                "type": "CapabilityNotInstalled",
1241                "cap_id": cap_id,
1242                "op_name": op_name,
1243                "stable_id": stable_id,
1244            }
1245        })),
1246        raw: None,
1247        error: Some(format!(
1248            "CapabilityNotInstalled(cap_id={cap_id}, op_name={op_name}, stable_id={stable_id})"
1249        )),
1250        mode: RunnerExecutionMode::Exec,
1251    }
1252}
1253
1254fn capability_route_error_outcome(cap_id: &str, op_name: &str, reason: String) -> FlowOutcome {
1255    FlowOutcome {
1256        success: false,
1257        output: Some(json!({
1258            "code": "capability_route_error",
1259            "error": {
1260                "type": "CapabilityRouteError",
1261                "cap_id": cap_id,
1262                "op_name": op_name,
1263                "reason": reason,
1264            }
1265        })),
1266        raw: None,
1267        error: Some(reason),
1268        mode: RunnerExecutionMode::Exec,
1269    }
1270}
1271
1272fn read_transcript_outputs(run_dir: &Path) -> anyhow::Result<Option<JsonValue>> {
1273    let path = run_dir.join("transcript.jsonl");
1274    if !path.exists() {
1275        return Ok(None);
1276    }
1277    let contents = std::fs::read_to_string(path)?;
1278    let mut last = None;
1279    for line in contents.lines() {
1280        let Ok(value) = serde_json::from_str::<JsonValue>(line) else {
1281            continue;
1282        };
1283        let Some(outputs) = value.get("outputs") else {
1284            continue;
1285        };
1286        if !outputs.is_null() {
1287            last = Some(outputs.clone());
1288        }
1289    }
1290    Ok(last)
1291}
1292
1293fn build_demo_host_config(tenant: &str) -> HostConfig {
1294    HostConfig {
1295        tenant: tenant.to_string(),
1296        bindings_path: PathBuf::from("<demo-provider>"),
1297        flow_type_bindings: HashMap::new(),
1298        rate_limits: RateLimits::default(),
1299        retry: FlowRetryConfig::default(),
1300        http_enabled: true,
1301        secrets_policy: SecretsPolicy::allow_all(),
1302        state_store_policy: StateStorePolicy::default(),
1303        webhook_policy: WebhookPolicy::default(),
1304        timers: Vec::new(),
1305        oauth: None,
1306        mocks: None,
1307        pack_bindings: Vec::new(),
1308        env_passthrough: Vec::new(),
1309        trace: TraceConfig::from_env(),
1310        validation: ValidationConfig::from_env(),
1311        operator_policy: OperatorPolicy::allow_all(),
1312    }
1313}
1314
1315fn validate_runner_binary(path: PathBuf) -> Option<PathBuf> {
1316    match fs::metadata(&path) {
1317        Ok(metadata) if metadata.is_file() && runner_binary_is_executable(&metadata) => Some(path),
1318        Ok(metadata) => {
1319            let reason = if !metadata.is_file() {
1320                "not a regular file"
1321            } else {
1322                "not executable"
1323            };
1324            operator_log::warn(
1325                module_path!(),
1326                format!(
1327                    "runner binary '{}' is not usable ({})",
1328                    path.display(),
1329                    reason
1330                ),
1331            );
1332            None
1333        }
1334        Err(err) => {
1335            operator_log::warn(
1336                module_path!(),
1337                format!(
1338                    "runner binary '{}' cannot be accessed: {}",
1339                    path.display(),
1340                    err
1341                ),
1342            );
1343            None
1344        }
1345    }
1346}
1347
1348fn pack_supports_provider_op(pack_path: &Path, op_id: &str) -> anyhow::Result<bool> {
1349    let file = std::fs::File::open(pack_path)?;
1350    let mut archive = ZipArchive::new(file)?;
1351    let mut manifest_entry = archive.by_name("manifest.cbor").map_err(|err| {
1352        anyhow!(
1353            "failed to open manifest.cbor in {}: {err}",
1354            pack_path.display()
1355        )
1356    })?;
1357    let mut bytes = Vec::new();
1358    manifest_entry.read_to_end(&mut bytes)?;
1359    let manifest = decode_pack_manifest(&bytes)
1360        .context("failed to decode pack manifest for op support introspection")?;
1361    if let Some(provider_ext) = manifest.provider_extension_inline() {
1362        if provider_ext
1363            .providers
1364            .iter()
1365            .any(|provider| provider.ops.iter().any(|op| op == op_id))
1366        {
1367            return Ok(true);
1368        }
1369    }
1370    // Fallback: if op is ingest_http, check for messaging-provider-* or messaging-ingress-*
1371    // component in pack. Some pack builds omit ingest_http from provider ops even though
1372    // the provider component implements it.
1373    if op_id == "ingest_http" {
1374        drop(manifest_entry);
1375        drop(bytes);
1376        let file2 = std::fs::File::open(pack_path)?;
1377        let archive2 = ZipArchive::new(file2)?;
1378        for i in 0..archive2.len() {
1379            if let Some(name) = archive2.name_for_index(i) {
1380                if name.ends_with(".wasm")
1381                    && (name.contains("messaging-ingress-") || name.contains("messaging-provider-"))
1382                {
1383                    return Ok(true);
1384                }
1385            }
1386        }
1387    }
1388    Ok(false)
1389}
1390
1391#[cfg(unix)]
1392fn runner_binary_is_executable(metadata: &fs::Metadata) -> bool {
1393    metadata.permissions().mode() & 0o111 != 0
1394}
1395
1396#[cfg(not(unix))]
1397fn runner_binary_is_executable(_: &fs::Metadata) -> bool {
1398    true
1399}
1400
1401fn payload_preview(bytes: &[u8]) -> String {
1402    const MAX_PREVIEW: usize = 256;
1403    if bytes.is_empty() {
1404        return "<empty>".to_string();
1405    }
1406    let preview_len = bytes.len().min(MAX_PREVIEW);
1407    if let Ok(text) = std::str::from_utf8(&bytes[..preview_len]) {
1408        if bytes.len() <= MAX_PREVIEW {
1409            text.to_string()
1410        } else {
1411            format!("{text}...")
1412        }
1413    } else {
1414        let encoded = general_purpose::STANDARD.encode(&bytes[..preview_len]);
1415        if bytes.len() <= MAX_PREVIEW {
1416            encoded
1417        } else {
1418            format!("{encoded}...")
1419        }
1420    }
1421}
1422
1423fn extract_token_validation_request(payload_bytes: &[u8]) -> Option<JsonValue> {
1424    let payload: JsonValue = serde_json::from_slice(payload_bytes).ok()?;
1425    let token = extract_bearer_token(&payload)?;
1426    let mut request = serde_json::Map::new();
1427    request.insert("token".to_string(), JsonValue::String(token));
1428    if let Some(issuer) = first_string_at_paths(
1429        &payload,
1430        &["/token_validation/issuer", "/auth/issuer", "/issuer"],
1431    ) {
1432        request.insert("issuer".to_string(), JsonValue::String(issuer));
1433    }
1434    if let Some(audience) = first_value_at_paths(
1435        &payload,
1436        &["/token_validation/audience", "/auth/audience", "/audience"],
1437    ) {
1438        request.insert("audience".to_string(), normalize_string_or_array(audience));
1439    }
1440    if let Some(scopes) = first_value_at_paths(
1441        &payload,
1442        &[
1443            "/token_validation/scopes",
1444            "/token_validation/required_scopes",
1445            "/auth/scopes",
1446            "/auth/required_scopes",
1447            "/scopes",
1448        ],
1449    ) {
1450        request.insert("scopes".to_string(), normalize_string_or_array(scopes));
1451    }
1452    Some(JsonValue::Object(request))
1453}
1454
1455fn extract_bearer_token(payload: &JsonValue) -> Option<String> {
1456    if let Some(value) = first_string_at_paths(
1457        payload,
1458        &[
1459            "/token_validation/token",
1460            "/auth/token",
1461            "/bearer_token",
1462            "/token",
1463            "/access_token",
1464            "/authorization",
1465        ],
1466    ) && let Some(token) = parse_bearer_value(&value)
1467    {
1468        return Some(token);
1469    }
1470
1471    if let Some(headers) = payload.get("headers")
1472        && let Some(token) = extract_bearer_from_headers(headers)
1473    {
1474        return Some(token);
1475    }
1476
1477    if let Some(value) = payload
1478        .pointer("/metadata/authorization")
1479        .and_then(JsonValue::as_str)
1480        && let Some(token) = parse_bearer_value(value)
1481    {
1482        return Some(token);
1483    }
1484
1485    None
1486}
1487
1488fn extract_bearer_from_headers(headers: &JsonValue) -> Option<String> {
1489    match headers {
1490        JsonValue::Object(map) => {
1491            for key in ["authorization", "Authorization"] {
1492                if let Some(value) = map.get(key).and_then(JsonValue::as_str)
1493                    && let Some(token) = parse_bearer_value(value)
1494                {
1495                    return Some(token);
1496                }
1497            }
1498            None
1499        }
1500        JsonValue::Array(values) => values.iter().find_map(|entry| {
1501            let name = entry
1502                .get("name")
1503                .or_else(|| entry.get("key"))
1504                .and_then(JsonValue::as_str)?;
1505            if !name.eq_ignore_ascii_case("authorization") {
1506                return None;
1507            }
1508            let value = entry.get("value").and_then(JsonValue::as_str)?;
1509            parse_bearer_value(value)
1510        }),
1511        _ => None,
1512    }
1513}
1514
1515fn parse_bearer_value(raw: &str) -> Option<String> {
1516    let trimmed = raw.trim();
1517    if trimmed.is_empty() {
1518        return None;
1519    }
1520    if let Some(rest) = trimmed.strip_prefix("Bearer ") {
1521        let token = rest.trim();
1522        if token.is_empty() {
1523            None
1524        } else {
1525            Some(token.to_string())
1526        }
1527    } else {
1528        Some(trimmed.to_string())
1529    }
1530}
1531
1532fn first_string_at_paths(payload: &JsonValue, paths: &[&str]) -> Option<String> {
1533    paths
1534        .iter()
1535        .find_map(|path| payload.pointer(path).and_then(JsonValue::as_str))
1536        .map(str::to_string)
1537}
1538
1539fn first_value_at_paths<'a>(payload: &'a JsonValue, paths: &[&str]) -> Option<&'a JsonValue> {
1540    paths.iter().find_map(|path| payload.pointer(path))
1541}
1542
1543fn normalize_string_or_array(value: &JsonValue) -> JsonValue {
1544    match value {
1545        JsonValue::String(raw) => {
1546            let values = raw
1547                .split_whitespace()
1548                .filter(|entry| !entry.trim().is_empty())
1549                .map(|entry| JsonValue::String(entry.to_string()))
1550                .collect::<Vec<_>>();
1551            JsonValue::Array(values)
1552        }
1553        JsonValue::Array(items) => JsonValue::Array(
1554            items
1555                .iter()
1556                .filter_map(|item| item.as_str())
1557                .map(|item| JsonValue::String(item.to_string()))
1558                .collect(),
1559        ),
1560        _ => JsonValue::Array(Vec::new()),
1561    }
1562}
1563
1564enum TokenValidationDecision {
1565    Allow(Option<JsonValue>),
1566    Deny(String),
1567}
1568
1569fn evaluate_token_validation_output(output: &JsonValue) -> TokenValidationDecision {
1570    let valid = output
1571        .get("valid")
1572        .and_then(JsonValue::as_bool)
1573        .or_else(|| output.get("ok").and_then(JsonValue::as_bool))
1574        .unwrap_or(false);
1575    if !valid {
1576        let reason = output
1577            .get("reason")
1578            .and_then(JsonValue::as_str)
1579            .or_else(|| output.get("error").and_then(JsonValue::as_str))
1580            .unwrap_or("invalid bearer token");
1581        return TokenValidationDecision::Deny(reason.to_string());
1582    }
1583    let claims = output
1584        .get("claims")
1585        .filter(|value| value.is_object())
1586        .cloned()
1587        .or_else(|| {
1588            output
1589                .as_object()
1590                .is_some_and(|map| map.contains_key("sub"))
1591                .then(|| output.clone())
1592        });
1593    TokenValidationDecision::Allow(claims)
1594}
1595
1596#[cfg(test)]
1597mod tests {
1598    use super::*;
1599    use serde_json::json;
1600
1601    #[test]
1602    fn token_validation_request_extracts_bearer_and_requirements() {
1603        let payload = json!({
1604            "headers": {
1605                "Authorization": "Bearer token-123"
1606            },
1607            "token_validation": {
1608                "issuer": "https://issuer.example",
1609                "audience": ["api://svc"],
1610                "required_scopes": "read write"
1611            }
1612        });
1613        let request =
1614            extract_token_validation_request(&serde_json::to_vec(&payload).expect("payload bytes"))
1615                .expect("request");
1616        assert_eq!(
1617            request.pointer("/token").and_then(JsonValue::as_str),
1618            Some("token-123")
1619        );
1620        assert_eq!(
1621            request.pointer("/issuer").and_then(JsonValue::as_str),
1622            Some("https://issuer.example")
1623        );
1624        assert_eq!(
1625            request.pointer("/audience/0").and_then(JsonValue::as_str),
1626            Some("api://svc")
1627        );
1628        assert_eq!(
1629            request.pointer("/scopes/0").and_then(JsonValue::as_str),
1630            Some("read")
1631        );
1632        assert_eq!(
1633            request.pointer("/scopes/1").and_then(JsonValue::as_str),
1634            Some("write")
1635        );
1636    }
1637
1638    #[test]
1639    fn token_validation_output_deny_when_invalid() {
1640        let output = json!({
1641            "valid": false,
1642            "reason": "issuer mismatch"
1643        });
1644        match evaluate_token_validation_output(&output) {
1645            TokenValidationDecision::Deny(reason) => {
1646                assert_eq!(reason, "issuer mismatch");
1647            }
1648            TokenValidationDecision::Allow(_) => panic!("expected deny"),
1649        }
1650    }
1651
1652    #[test]
1653    fn token_validation_output_allows_and_returns_claims() {
1654        let output = json!({
1655            "valid": true,
1656            "claims": {
1657                "sub": "user-1",
1658                "scope": "read write",
1659                "aud": ["api://svc"]
1660            }
1661        });
1662        match evaluate_token_validation_output(&output) {
1663            TokenValidationDecision::Allow(Some(claims)) => {
1664                assert_eq!(
1665                    claims.pointer("/sub").and_then(JsonValue::as_str),
1666                    Some("user-1")
1667                );
1668            }
1669            TokenValidationDecision::Allow(None) => panic!("expected claims"),
1670            TokenValidationDecision::Deny(reason) => panic!("unexpected deny: {reason}"),
1671        }
1672    }
1673}