1use std::collections::{BTreeMap, HashMap};
2use std::env;
3use std::fs;
4use std::io::Read;
5#[cfg(unix)]
6use std::os::unix::fs::PermissionsExt;
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9
10use anyhow::{Context, anyhow};
11use base64::{Engine as _, engine::general_purpose};
12use greentic_runner_desktop::RunStatus;
13use greentic_runner_host::{
14 RunnerWasiPolicy,
15 component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx},
16 config::{
17 FlowRetryConfig, HostConfig, OperatorPolicy, RateLimits, SecretsPolicy, StateStorePolicy,
18 WebhookPolicy,
19 },
20 pack::{ComponentResolution, PackRuntime},
21 storage::{DynSessionStore, DynStateStore, new_state_store},
22 trace::TraceConfig,
23 validate::ValidationConfig,
24};
25use greentic_types::cbor::canonical;
26use greentic_types::decode_pack_manifest;
27use serde::{Deserialize, Serialize};
28use serde_json::{Value as JsonValue, json};
29use tokio::runtime::Runtime as TokioRuntime;
30use zip::ZipArchive;
31
32fn make_runtime_or_thread_scope<F, T>(f: F) -> T
37where
38 F: FnOnce(&TokioRuntime) -> T + Send,
39 T: Send,
40{
41 if tokio::runtime::Handle::try_current().is_ok() {
42 std::thread::scope(|s| {
43 s.spawn(|| {
44 let rt = TokioRuntime::new().expect("failed to create tokio runtime");
45 f(&rt)
46 })
47 .join()
48 .expect("provider invocation thread panicked")
49 })
50 } else {
51 let rt = TokioRuntime::new().expect("failed to create tokio runtime");
52 f(&rt)
53 }
54}
55
56use crate::runner_exec;
57use crate::runner_integration;
58use crate::runner_integration::RunFlowOptions;
59use crate::runner_integration::RunnerFlavor;
60use crate::runner_integration::run_flow_with_options;
61
62use crate::capabilities::{
63 CAP_OAUTH_BROKER_V1, CAP_OAUTH_TOKEN_VALIDATION_V1, CapabilityBinding, CapabilityInstallRecord,
64 CapabilityPackRecord, CapabilityRegistry, HookStage, OAUTH_OP_AWAIT_RESULT,
65 OAUTH_OP_GET_ACCESS_TOKEN, OAUTH_OP_INITIATE_AUTH, OAUTH_OP_REQUEST_RESOURCE_TOKEN,
66 ResolveScope, is_binding_ready, is_oauth_broker_operation, write_install_record,
67};
68use crate::cards::CardRenderer;
69use crate::discovery;
70use crate::domains::{self, Domain, ProviderPack};
71use crate::operator_log;
72use crate::secrets_gate::{self, DynSecretsManager, SecretsManagerHandle};
73use crate::secrets_manager;
74use crate::state_layout;
75
76#[derive(Clone)]
77pub struct OperatorContext {
78 pub tenant: String,
79 pub team: Option<String>,
80 pub correlation_id: Option<String>,
81}
82
83#[derive(Clone, Copy, Debug, PartialEq, Eq)]
84pub enum RunnerExecutionMode {
85 Exec,
86 Integration,
87}
88
89#[derive(Clone)]
90pub struct FlowOutcome {
91 pub success: bool,
92 pub output: Option<JsonValue>,
93 pub raw: Option<String>,
94 pub error: Option<String>,
95 pub mode: RunnerExecutionMode,
96}
97
98#[derive(Clone, Debug, Serialize, Deserialize)]
99#[serde(rename_all = "snake_case")]
100enum OperationStatus {
101 Pending,
102 Denied,
103 Ok,
104 Err,
105}
106
107#[derive(Clone, Debug, Serialize, Deserialize)]
108struct OperationEnvelopeContext {
109 tenant: String,
110 team: Option<String>,
111 correlation_id: Option<String>,
112 #[serde(default, skip_serializing_if = "Option::is_none")]
113 auth_claims: Option<JsonValue>,
114}
115
116#[derive(Clone, Debug, Serialize, Deserialize)]
117struct OperationEnvelope {
118 op_id: String,
119 op_name: String,
120 ctx: OperationEnvelopeContext,
121 payload_cbor: Vec<u8>,
122 meta_cbor: Option<Vec<u8>>,
123 status: OperationStatus,
124 result_cbor: Option<Vec<u8>>,
125}
126
127impl OperationEnvelope {
128 fn new(op_name: &str, payload: &[u8], ctx: &OperatorContext) -> Self {
129 Self {
130 op_id: uuid::Uuid::new_v4().to_string(),
131 op_name: op_name.to_string(),
132 ctx: OperationEnvelopeContext {
133 tenant: ctx.tenant.clone(),
134 team: ctx.team.clone(),
135 correlation_id: ctx.correlation_id.clone(),
136 auth_claims: None,
137 },
138 payload_cbor: payload.to_vec(),
139 meta_cbor: None,
140 status: OperationStatus::Pending,
141 result_cbor: None,
142 }
143 }
144}
145
146#[derive(Debug, Serialize)]
147struct HookEvalRequest {
148 stage: String,
149 op_name: String,
150 envelope: OperationEnvelope,
151}
152
153#[derive(Debug, Deserialize)]
154struct HookEvalResponse {
155 decision: String,
156 #[serde(default)]
157 reason: Option<String>,
158 #[serde(default)]
159 envelope: Option<OperationEnvelope>,
160}
161
162#[derive(Debug)]
163enum HookChainOutcome {
164 Continue,
165 Denied(String),
166}
167
168#[derive(Clone, Debug)]
169enum RunnerMode {
170 Exec,
171 Integration {
172 binary: PathBuf,
173 flavor: RunnerFlavor,
174 },
175}
176
177#[derive(Clone)]
178pub struct DemoRunnerHost {
179 bundle_root: PathBuf,
180 runner_mode: RunnerMode,
181 catalog: HashMap<(Domain, String), ProviderPack>,
182 packs_by_path: BTreeMap<PathBuf, ProviderPack>,
183 capability_registry: CapabilityRegistry,
184 secrets_handle: SecretsManagerHandle,
185 card_renderer: CardRenderer,
186 state_store: DynStateStore,
187 debug_enabled: bool,
188}
189
190impl DemoRunnerHost {
191 pub fn bundle_root(&self) -> &Path {
192 &self.bundle_root
193 }
194
195 pub fn secrets_manager(&self) -> DynSecretsManager {
196 self.secrets_handle.manager()
197 }
198
199 pub fn secrets_handle(&self) -> &SecretsManagerHandle {
200 &self.secrets_handle
201 }
202
203 pub fn state_store(&self) -> DynStateStore {
204 self.state_store.clone()
205 }
206
207 pub fn set_state_store(&mut self, store: DynStateStore) {
209 self.state_store = store;
210 }
211
212 pub fn new(
213 bundle_root: PathBuf,
214 discovery: &discovery::DiscoveryResult,
215 runner_binary: Option<PathBuf>,
216 secrets_handle: SecretsManagerHandle,
217 debug_enabled: bool,
218 ) -> anyhow::Result<Self> {
219 let runner_binary = runner_binary.and_then(validate_runner_binary);
220 let mode = if let Some(ref binary) = runner_binary {
221 let flavor = runner_integration::detect_runner_flavor(binary);
222 RunnerMode::Integration {
223 binary: binary.clone(),
224 flavor,
225 }
226 } else {
227 RunnerMode::Exec
228 };
229 let mut catalog = HashMap::new();
230 let mut packs_by_path = BTreeMap::new();
231 let mut pack_index: BTreeMap<PathBuf, CapabilityPackRecord> = BTreeMap::new();
232 let provider_map = discovery
233 .providers
234 .iter()
235 .map(|provider| (provider.pack_path.clone(), provider.provider_id.clone()))
236 .collect::<HashMap<_, _>>();
237 for domain in [
238 Domain::Messaging,
239 Domain::Events,
240 Domain::Secrets,
241 Domain::OAuth,
242 ] {
243 let is_demo_bundle = bundle_root.join("greentic.demo.yaml").exists();
244 let packs = if is_demo_bundle {
245 domains::discover_provider_packs_cbor_only(&bundle_root, domain)?
246 } else {
247 domains::discover_provider_packs(&bundle_root, domain)?
248 };
249 for pack in packs {
250 packs_by_path.insert(pack.path.clone(), pack.clone());
251 pack_index.insert(
252 pack.path.clone(),
253 CapabilityPackRecord {
254 pack_id: pack.pack_id.clone(),
255 domain,
256 },
257 );
258 let provider_type = provider_map
259 .get(&pack.path)
260 .cloned()
261 .unwrap_or_else(|| pack.pack_id.clone());
262 catalog.insert((domain, provider_type.clone()), pack.clone());
263 if provider_type != pack.pack_id {
264 catalog.insert((domain, pack.pack_id.clone()), pack.clone());
265 }
266 }
267 }
268 let capability_registry = CapabilityRegistry::build_from_pack_index(&pack_index)?;
269 Ok(Self {
270 bundle_root,
271 runner_mode: mode,
272 catalog,
273 packs_by_path,
274 capability_registry,
275 secrets_handle,
276 card_renderer: CardRenderer::new(),
277 state_store: new_state_store(),
278 debug_enabled,
279 })
280 }
281
282 pub fn debug_enabled(&self) -> bool {
283 self.debug_enabled
284 }
285
286 pub fn canonical_provider_type(&self, domain: Domain, lookup_key: &str) -> String {
290 if let Some(pack) = self.catalog.get(&(domain, lookup_key.to_string())) {
291 primary_provider_type(&pack.path).unwrap_or_else(|_| lookup_key.to_string())
292 } else {
293 lookup_key.to_string()
294 }
295 }
296
297 pub fn resolve_capability(
298 &self,
299 cap_id: &str,
300 min_version: Option<&str>,
301 scope: ResolveScope,
302 ) -> Option<CapabilityBinding> {
303 self.capability_registry
304 .resolve(cap_id, min_version, &scope)
305 }
306
307 pub fn resolve_hook_chain(&self, stage: HookStage, op_name: &str) -> Vec<CapabilityBinding> {
308 self.capability_registry.resolve_hook_chain(stage, op_name)
309 }
310
311 pub fn has_provider_packs_for_domain(&self, domain: Domain) -> bool {
312 self.catalog
313 .keys()
314 .any(|(entry_domain, _)| *entry_domain == domain)
315 }
316
317 pub fn capability_setup_plan(&self, ctx: &OperatorContext) -> Vec<CapabilityBinding> {
318 let scope = ResolveScope {
319 env: env::var("GREENTIC_ENV").ok(),
320 tenant: Some(ctx.tenant.clone()),
321 team: ctx.team.clone(),
322 };
323 self.capability_registry
324 .offers_requiring_setup(&scope)
325 .into_iter()
326 .map(|offer| CapabilityBinding {
327 cap_id: offer.cap_id,
328 stable_id: offer.stable_id,
329 pack_id: offer.pack_id,
330 domain: offer.domain,
331 pack_path: offer.pack_path,
332 provider_component_ref: offer.provider_component_ref,
333 provider_op: offer.provider_op,
334 version: offer.version,
335 requires_setup: offer.requires_setup,
336 setup_qa_ref: offer.setup_qa_ref,
337 })
338 .collect()
339 }
340
341 pub fn mark_capability_ready(
342 &self,
343 ctx: &OperatorContext,
344 binding: &CapabilityBinding,
345 ) -> anyhow::Result<PathBuf> {
346 let record =
347 CapabilityInstallRecord::ready(&binding.cap_id, &binding.stable_id, &binding.pack_id);
348 write_install_record(&self.bundle_root, &ctx.tenant, ctx.team.as_deref(), &record)
349 }
350
351 pub fn mark_capability_failed(
352 &self,
353 ctx: &OperatorContext,
354 binding: &CapabilityBinding,
355 failure_key: &str,
356 ) -> anyhow::Result<PathBuf> {
357 let record = CapabilityInstallRecord::failed(
358 &binding.cap_id,
359 &binding.stable_id,
360 &binding.pack_id,
361 failure_key,
362 );
363 write_install_record(&self.bundle_root, &ctx.tenant, ctx.team.as_deref(), &record)
364 }
365
366 pub fn invoke_capability(
367 &self,
368 cap_id: &str,
369 op: &str,
370 payload_bytes: &[u8],
371 ctx: &OperatorContext,
372 ) -> anyhow::Result<FlowOutcome> {
373 let _span = tracing::info_span!(
374 "invoke_capability",
375 cap_id = %cap_id,
376 op = %op,
377 tenant = %ctx.tenant,
378 )
379 .entered();
380 let requested_op = op.trim();
381 if cap_id == CAP_OAUTH_BROKER_V1 {
382 if requested_op.is_empty() {
383 return Ok(capability_route_error_outcome(
384 cap_id,
385 "<missing-op>",
386 format!(
387 "oauth broker capability requires an explicit op (supported: {}, {}, {}, {})",
388 OAUTH_OP_INITIATE_AUTH,
389 OAUTH_OP_AWAIT_RESULT,
390 OAUTH_OP_GET_ACCESS_TOKEN,
391 OAUTH_OP_REQUEST_RESOURCE_TOKEN
392 ),
393 ));
394 }
395 if !is_oauth_broker_operation(requested_op) {
396 return Ok(capability_route_error_outcome(
397 cap_id,
398 requested_op,
399 format!(
400 "unsupported oauth broker op `{requested_op}` (supported: {}, {}, {}, {})",
401 OAUTH_OP_INITIATE_AUTH,
402 OAUTH_OP_AWAIT_RESULT,
403 OAUTH_OP_GET_ACCESS_TOKEN,
404 OAUTH_OP_REQUEST_RESOURCE_TOKEN
405 ),
406 ));
407 }
408 }
409 let scope = ResolveScope {
410 env: env::var("GREENTIC_ENV").ok(),
411 tenant: Some(ctx.tenant.clone()),
412 team: ctx.team.clone(),
413 };
414 let binding = if requested_op.is_empty() {
415 self.resolve_capability(cap_id, None, scope)
416 } else {
417 self.capability_registry
418 .resolve_for_op(cap_id, None, &scope, Some(requested_op))
419 };
420 let Some(binding) = binding else {
421 return Ok(missing_capability_outcome(cap_id, op, None));
422 };
423 if !is_binding_ready(
424 &self.bundle_root,
425 &ctx.tenant,
426 ctx.team.as_deref(),
427 &binding,
428 )? {
429 return Ok(capability_not_installed_outcome(
430 cap_id,
431 op,
432 &binding.stable_id,
433 ));
434 }
435
436 let Some(pack) = self.packs_by_path.get(&binding.pack_path) else {
437 return Ok(capability_route_error_outcome(
438 cap_id,
439 op,
440 format!("resolved pack not found at {}", binding.pack_path.display()),
441 ));
442 };
443
444 let target_op = if cap_id == CAP_OAUTH_BROKER_V1 || requested_op.is_empty() {
445 binding.provider_op.as_str()
447 } else {
448 requested_op
449 };
450
451 let mut envelope =
453 OperationEnvelope::new(&format!("cap.invoke:{cap_id}"), payload_bytes, ctx);
454 let token_validation_outcome =
455 self.evaluate_token_validation_pre_hook(&mut envelope, payload_bytes, ctx)?;
456 if let HookChainOutcome::Denied(reason) = token_validation_outcome {
457 envelope.status = OperationStatus::Denied;
458 self.emit_post_sub(&envelope);
459 return Ok(capability_route_error_outcome(
460 cap_id,
461 target_op,
462 format!("operation denied by pre-hook: {reason}"),
463 ));
464 }
465 let pre_chain = self.resolve_hook_chain(HookStage::Pre, &envelope.op_name);
466 let pre_hook_outcome =
467 self.evaluate_hook_chain(&pre_chain, HookStage::Pre, &mut envelope)?;
468 self.emit_pre_sub(&envelope);
469 if let HookChainOutcome::Denied(reason) = pre_hook_outcome {
470 envelope.status = OperationStatus::Denied;
471 self.emit_post_sub(&envelope);
472 return Ok(capability_route_error_outcome(
473 cap_id,
474 target_op,
475 format!("operation denied by pre-hook: {reason}"),
476 ));
477 }
478
479 let outcome = self.invoke_provider_component_op(
480 binding.domain,
481 pack,
482 &binding.pack_id,
483 target_op,
484 payload_bytes,
485 ctx,
486 )?;
487
488 envelope.status = if outcome.success {
489 OperationStatus::Ok
490 } else {
491 OperationStatus::Err
492 };
493 envelope.result_cbor = outcome.output.as_ref().and_then(json_to_canonical_cbor);
494 let post_chain = self.resolve_hook_chain(HookStage::Post, &envelope.op_name);
495 let _ = self.evaluate_hook_chain(&post_chain, HookStage::Post, &mut envelope)?;
496 self.emit_post_sub(&envelope);
497 Ok(outcome)
498 }
499
500 pub fn supports_op(&self, domain: Domain, provider_type: &str, op_id: &str) -> bool {
501 self.catalog
502 .get(&(domain, provider_type.to_string()))
503 .map(|pack| {
504 pack.entry_flows.iter().any(|flow| flow == op_id)
505 || pack_supports_provider_op(&pack.path, op_id).unwrap_or(false)
506 })
507 .unwrap_or(false)
508 }
509
510 pub fn invoke_provider_op(
511 &self,
512 domain: Domain,
513 provider_type: &str,
514 op_id: &str,
515 payload_bytes: &[u8],
516 ctx: &OperatorContext,
517 ) -> anyhow::Result<FlowOutcome> {
518 let _span = tracing::info_span!(
519 "invoke_provider_op",
520 provider = %provider_type,
521 op = %op_id,
522 tenant = %ctx.tenant,
523 )
524 .entered();
525 let mut envelope = OperationEnvelope::new(op_id, payload_bytes, ctx);
526 let token_validation_outcome =
527 self.evaluate_token_validation_pre_hook(&mut envelope, payload_bytes, ctx)?;
528 if let HookChainOutcome::Denied(reason) = token_validation_outcome {
529 envelope.status = OperationStatus::Denied;
530 self.emit_pre_sub(&envelope);
531 self.emit_post_sub(&envelope);
532 return Ok(FlowOutcome {
533 success: false,
534 output: None,
535 raw: None,
536 error: Some(format!("operation denied by pre-hook: {reason}")),
537 mode: RunnerExecutionMode::Exec,
538 });
539 }
540 let pre_chain = self.resolve_hook_chain(HookStage::Pre, op_id);
541 let pre_hook_outcome =
542 self.evaluate_hook_chain(&pre_chain, HookStage::Pre, &mut envelope)?;
543 self.emit_pre_sub(&envelope);
544 if let HookChainOutcome::Denied(reason) = pre_hook_outcome {
545 envelope.status = OperationStatus::Denied;
546 self.emit_post_sub(&envelope);
547 return Ok(FlowOutcome {
548 success: false,
549 output: Some(serde_json::to_value(&envelope).unwrap_or_else(|_| json!({}))),
550 raw: None,
551 error: Some(format!("operation denied by pre-hook: {reason}")),
552 mode: RunnerExecutionMode::Exec,
553 });
554 }
555
556 let outcome =
557 self.invoke_provider_op_inner(domain, provider_type, op_id, payload_bytes, ctx)?;
558 envelope.status = if outcome.success {
559 OperationStatus::Ok
560 } else {
561 OperationStatus::Err
562 };
563 envelope.result_cbor = outcome.output.as_ref().and_then(json_to_canonical_cbor);
564
565 let post_chain = self.resolve_hook_chain(HookStage::Post, op_id);
566 let _ = self.evaluate_hook_chain(&post_chain, HookStage::Post, &mut envelope)?;
567 self.emit_post_sub(&envelope);
568 Ok(outcome)
569 }
570
571 fn invoke_provider_op_inner(
572 &self,
573 domain: Domain,
574 provider_type: &str,
575 op_id: &str,
576 payload_bytes: &[u8],
577 ctx: &OperatorContext,
578 ) -> anyhow::Result<FlowOutcome> {
579 let pack = self
580 .catalog
581 .get(&(domain, provider_type.to_string()))
582 .ok_or_else(|| {
583 anyhow::anyhow!(
584 "provider {} not found for domain {}",
585 provider_type,
586 domains::domain_name(domain)
587 )
588 })?;
589
590 if pack.entry_flows.iter().any(|flow| flow == op_id) {
591 let flow_id = op_id;
592 if self.debug_enabled {
593 operator_log::debug(
594 module_path!(),
595 format!(
596 "[demo dev] invoking provider domain={} provider={} flow={} tenant={} team={} payload_len={} preview={}",
597 domains::domain_name(domain),
598 provider_type,
599 flow_id,
600 ctx.tenant,
601 ctx.team.as_deref().unwrap_or("default"),
602 payload_bytes.len(),
603 payload_preview(payload_bytes),
604 ),
605 );
606 }
607 let run_dir = state_layout::run_dir(&self.bundle_root, domain, &pack.pack_id, flow_id)?;
608 std::fs::create_dir_all(&run_dir)?;
609
610 let render_outcome = self.card_renderer.render_if_needed(
611 provider_type,
612 payload_bytes,
613 |cap_id, op, input| {
614 let outcome = self.invoke_capability(cap_id, op, input, ctx)?;
615 if !outcome.success {
616 let reason = outcome
617 .error
618 .clone()
619 .or(outcome.raw.clone())
620 .unwrap_or_else(|| "capability invocation failed".to_string());
621 return Err(anyhow!(
622 "card capability {}:{} failed: {}",
623 cap_id,
624 op,
625 reason
626 ));
627 }
628 outcome.output.ok_or_else(|| {
629 anyhow!(
630 "card capability {}:{} returned no structured output",
631 cap_id,
632 op
633 )
634 })
635 },
636 )?;
637 let payload = serde_json::from_slice(&render_outcome.bytes).unwrap_or_else(|_| {
638 json!({
639 "payload": general_purpose::STANDARD.encode(&render_outcome.bytes)
640 })
641 });
642
643 let outcome = match &self.runner_mode {
644 RunnerMode::Exec => {
645 self.execute_with_runner_exec(domain, pack, flow_id, &payload, ctx, &run_dir)?
646 }
647 RunnerMode::Integration { binary, flavor } => self
648 .execute_with_runner_integration(
649 domain, pack, flow_id, &payload, ctx, &run_dir, binary, *flavor,
650 )?,
651 };
652
653 if self.debug_enabled {
654 operator_log::debug(
655 module_path!(),
656 format!(
657 "[demo dev] provider={} flow={} tenant={} team={} success={} mode={:?} error={:?} corr_id={}",
658 provider_type,
659 flow_id,
660 ctx.tenant,
661 ctx.team.as_deref().unwrap_or("default"),
662 outcome.success,
663 outcome.mode,
664 outcome.error,
665 ctx.correlation_id.as_deref().unwrap_or("none"),
666 ),
667 );
668 }
669 operator_log::info(
670 module_path!(),
671 format!(
672 "invoke domain={} provider={} op={} mode={:?} corr={}",
673 domains::domain_name(domain),
674 provider_type,
675 flow_id,
676 outcome.mode,
677 ctx.correlation_id.as_deref().unwrap_or("none")
678 ),
679 );
680
681 return Ok(outcome);
682 }
683
684 self.invoke_provider_component_op(domain, pack, provider_type, op_id, payload_bytes, ctx)
685 }
686
687 fn evaluate_hook_chain(
688 &self,
689 chain: &[CapabilityBinding],
690 stage: HookStage,
691 envelope: &mut OperationEnvelope,
692 ) -> anyhow::Result<HookChainOutcome> {
693 for binding in chain {
694 let Some(pack) = self.packs_by_path.get(&binding.pack_path) else {
695 operator_log::warn(
696 module_path!(),
697 format!(
698 "hook binding skipped; pack not found stable_id={} path={}",
699 binding.stable_id,
700 binding.pack_path.display()
701 ),
702 );
703 continue;
704 };
705
706 let payload = canonical::to_canonical_cbor(&HookEvalRequest {
707 stage: match stage {
708 HookStage::Pre => "pre",
709 HookStage::Post => "post",
710 }
711 .to_string(),
712 op_name: envelope.op_name.clone(),
713 envelope: envelope.clone(),
714 })
715 .map_err(|err| anyhow!("failed to encode hook request as cbor: {err}"))?;
716 let ctx = OperatorContext {
717 tenant: envelope.ctx.tenant.clone(),
718 team: envelope.ctx.team.clone(),
719 correlation_id: envelope.ctx.correlation_id.clone(),
720 };
721 let outcome = self.invoke_provider_component_op(
722 binding.domain,
723 pack,
724 &binding.pack_id,
725 &binding.provider_op,
726 &payload,
727 &ctx,
728 )?;
729 if !outcome.success {
730 operator_log::warn(
731 module_path!(),
732 format!(
733 "hook invocation failed stage={:?} binding={} err={}",
734 stage,
735 binding.stable_id,
736 outcome.error.unwrap_or_else(|| "unknown error".to_string())
737 ),
738 );
739 continue;
740 }
741 let Some(output) = outcome.output else {
742 continue;
743 };
744 let parsed: HookEvalResponse = match decode_hook_response(&output) {
745 Ok(value) => value,
746 Err(err) => {
747 operator_log::warn(
748 module_path!(),
749 format!(
750 "hook response decode failed stage={:?} binding={} err={} (expected cbor, with legacy json fallback)",
751 stage, binding.stable_id, err
752 ),
753 );
754 continue;
755 }
756 };
757 if let Some(updated) = parsed.envelope {
758 *envelope = updated;
759 }
760 if parsed.decision.eq_ignore_ascii_case("deny") && matches!(stage, HookStage::Pre) {
761 let reason = parsed
762 .reason
763 .unwrap_or_else(|| "hook denied operation".to_string());
764 return Ok(HookChainOutcome::Denied(reason));
765 }
766 }
767 Ok(HookChainOutcome::Continue)
768 }
769
770 fn evaluate_token_validation_pre_hook(
771 &self,
772 envelope: &mut OperationEnvelope,
773 payload_bytes: &[u8],
774 ctx: &OperatorContext,
775 ) -> anyhow::Result<HookChainOutcome> {
776 if envelope
777 .op_name
778 .starts_with(&format!("cap.invoke:{CAP_OAUTH_TOKEN_VALIDATION_V1}"))
779 {
780 return Ok(HookChainOutcome::Continue);
781 }
782 let Some(validation_request) = extract_token_validation_request(payload_bytes) else {
783 return Ok(HookChainOutcome::Continue);
784 };
785 let scope = ResolveScope {
786 env: env::var("GREENTIC_ENV").ok(),
787 tenant: Some(ctx.tenant.clone()),
788 team: ctx.team.clone(),
789 };
790 let Some(binding) = self.resolve_capability(CAP_OAUTH_TOKEN_VALIDATION_V1, None, scope)
791 else {
792 return Ok(HookChainOutcome::Continue);
793 };
794 if !is_binding_ready(
795 &self.bundle_root,
796 &ctx.tenant,
797 ctx.team.as_deref(),
798 &binding,
799 )? {
800 return Ok(HookChainOutcome::Denied(format!(
801 "token validation capability is not installed (stable_id={})",
802 binding.stable_id
803 )));
804 }
805 let Some(pack) = self.packs_by_path.get(&binding.pack_path) else {
806 return Ok(HookChainOutcome::Denied(format!(
807 "token validation pack not found at {}",
808 binding.pack_path.display()
809 )));
810 };
811 let request_bytes = serde_json::to_vec(&validation_request)
812 .map_err(|err| anyhow!("failed to encode token validation payload: {err}"))?;
813 let outcome = self.invoke_provider_component_op(
814 binding.domain,
815 pack,
816 &binding.pack_id,
817 &binding.provider_op,
818 &request_bytes,
819 ctx,
820 )?;
821 if !outcome.success {
822 let reason = outcome
823 .error
824 .unwrap_or_else(|| "token validation capability invocation failed".to_string());
825 return Ok(HookChainOutcome::Denied(reason));
826 }
827 let Some(output) = outcome.output else {
828 return Ok(HookChainOutcome::Denied(
829 "token validation returned no output".to_string(),
830 ));
831 };
832 match evaluate_token_validation_output(&output) {
833 TokenValidationDecision::Allow(claims) => {
834 envelope.ctx.auth_claims = claims;
835 Ok(HookChainOutcome::Continue)
836 }
837 TokenValidationDecision::Deny(reason) => Ok(HookChainOutcome::Denied(reason)),
838 }
839 }
840
841 fn emit_pre_sub(&self, envelope: &OperationEnvelope) {
842 operator_log::info(
843 module_path!(),
844 format!(
845 "sub.pre op={} status={:?} tenant={} team={}",
846 envelope.op_name,
847 envelope.status,
848 envelope.ctx.tenant,
849 envelope.ctx.team.as_deref().unwrap_or("default")
850 ),
851 );
852 }
853
854 fn emit_post_sub(&self, envelope: &OperationEnvelope) {
855 operator_log::info(
856 module_path!(),
857 format!(
858 "sub.post op={} status={:?} tenant={} team={}",
859 envelope.op_name,
860 envelope.status,
861 envelope.ctx.tenant,
862 envelope.ctx.team.as_deref().unwrap_or("default")
863 ),
864 );
865 }
866
867 fn execute_with_runner_exec(
868 &self,
869 domain: Domain,
870 pack: &ProviderPack,
871 flow_id: &str,
872 payload: &JsonValue,
873 ctx: &OperatorContext,
874 _run_dir: &Path,
875 ) -> anyhow::Result<FlowOutcome> {
876 let request = runner_exec::RunRequest {
877 root: self.bundle_root.clone(),
878 domain,
879 pack_path: pack.path.clone(),
880 pack_label: pack.pack_id.clone(),
881 flow_id: flow_id.to_string(),
882 tenant: ctx.tenant.clone(),
883 team: ctx.team.clone(),
884 input: payload.clone(),
885 dist_offline: true,
886 };
887 let run_output = runner_exec::run_provider_pack_flow(request)?;
888 let parsed = read_transcript_outputs(&run_output.run_dir)?;
889 Ok(FlowOutcome {
890 success: run_output.result.status == RunStatus::Success,
891 output: parsed,
892 raw: None,
893 error: run_output.result.error.clone(),
894 mode: RunnerExecutionMode::Exec,
895 })
896 }
897
898 #[allow(clippy::too_many_arguments)]
899 fn execute_with_runner_integration(
900 &self,
901 _domain: Domain,
902 pack: &ProviderPack,
903 flow_id: &str,
904 payload: &JsonValue,
905 ctx: &OperatorContext,
906 run_dir: &Path,
907 runner_binary: &Path,
908 flavor: RunnerFlavor,
909 ) -> anyhow::Result<FlowOutcome> {
910 let output = run_flow_with_options(
911 runner_binary,
912 &pack.path,
913 flow_id,
914 payload,
915 RunFlowOptions {
916 dist_offline: true,
917 tenant: Some(&ctx.tenant),
918 team: ctx.team.as_deref(),
919 artifacts_dir: Some(run_dir),
920 runner_flavor: flavor,
921 },
922 )?;
923 let mut parsed = output.parsed.clone();
924 if parsed.is_none() {
925 parsed = read_transcript_outputs(run_dir)?;
926 }
927 let raw = if output.stdout.trim().is_empty() {
928 None
929 } else {
930 Some(output.stdout.clone())
931 };
932 Ok(FlowOutcome {
933 success: output.status.success(),
934 output: parsed,
935 raw,
936 error: if output.status.success() {
937 None
938 } else {
939 Some(output.stderr.clone())
940 },
941 mode: RunnerExecutionMode::Integration,
942 })
943 }
944
945 pub fn invoke_provider_component_op_direct(
946 &self,
947 domain: Domain,
948 pack: &ProviderPack,
949 provider_id: &str,
950 op_id: &str,
951 payload_bytes: &[u8],
952 ctx: &OperatorContext,
953 ) -> anyhow::Result<FlowOutcome> {
954 self.invoke_provider_component_op(domain, pack, provider_id, op_id, payload_bytes, ctx)
955 }
956
957 fn invoke_provider_component_op(
958 &self,
959 domain: Domain,
960 pack: &ProviderPack,
961 provider_id: &str,
962 op_id: &str,
963 payload_bytes: &[u8],
964 ctx: &OperatorContext,
965 ) -> anyhow::Result<FlowOutcome> {
966 if let RunnerMode::Integration { binary, flavor } = &self.runner_mode {
967 let payload_value: JsonValue =
968 serde_json::from_slice(payload_bytes).unwrap_or_else(|_| json!({}));
969 let run_dir = state_layout::run_dir(&self.bundle_root, domain, &pack.pack_id, op_id)?;
970 std::fs::create_dir_all(&run_dir)?;
971 return self.execute_with_runner_integration(
972 domain,
973 pack,
974 op_id,
975 &payload_value,
976 ctx,
977 &run_dir,
978 binary,
979 *flavor,
980 );
981 }
982
983 let payload = payload_bytes.to_vec();
984 let result = make_runtime_or_thread_scope(|runtime| {
985 runtime.block_on(async {
986 let host_config = Arc::new(build_demo_host_config(&ctx.tenant));
987 let fresh_secrets = secrets_gate::resolve_secrets_manager(
990 &self.bundle_root,
991 &ctx.tenant,
992 ctx.team.as_deref(),
993 )
994 .unwrap_or_else(|_| self.secrets_handle.clone());
995 let dev_store_display = fresh_secrets
996 .dev_store_path
997 .as_ref()
998 .map(|path| path.display().to_string())
999 .unwrap_or_else(|| "<default>".to_string());
1000 operator_log::info(
1001 module_path!(),
1002 format!(
1003 "secrets backend for wasm: using_env_fallback={} dev_store={}",
1004 fresh_secrets.using_env_fallback, dev_store_display,
1005 ),
1006 );
1007 operator_log::info(
1008 module_path!(),
1009 format!(
1010 "exec secrets: dev_store={} env_fallback={}",
1011 dev_store_display, fresh_secrets.using_env_fallback,
1012 ),
1013 );
1014 let pack_runtime = PackRuntime::load(
1015 &pack.path,
1016 host_config.clone(),
1017 None,
1018 Some(&pack.path),
1019 None::<DynSessionStore>,
1020 Some(self.state_store.clone()),
1021 Arc::new(RunnerWasiPolicy::default()),
1022 fresh_secrets.runtime_manager(Some(&pack.pack_id)),
1023 None,
1024 false,
1025 ComponentResolution::default(),
1026 )
1027 .await?;
1028 let provider_type = primary_provider_type(&pack.path)
1029 .context("failed to determine provider type for direct invocation")?;
1030 let _env_value = env::var("GREENTIC_ENV").unwrap_or_else(|_| "<unset>".to_string());
1031 let _canonical_team =
1032 secrets_manager::canonical_team(ctx.team.as_deref()).into_owned();
1033 let _runner_dev_store_desc = self
1034 .secrets_handle
1035 .dev_store_path
1036 .as_ref()
1037 .map(|path| path.display().to_string())
1038 .unwrap_or_else(|| "<none>".to_string());
1039 let binding = pack_runtime.resolve_provider(None, Some(&provider_type))?;
1040 let exec_ctx = ComponentExecCtx {
1041 tenant: ComponentTenantCtx {
1042 tenant: ctx.tenant.clone(),
1043 team: ctx.team.clone(),
1044 i18n_id: None,
1045 user: None,
1046 trace_id: None,
1047 correlation_id: ctx.correlation_id.clone(),
1048 deadline_unix_ms: None,
1049 attempt: 1,
1050 idempotency_key: None,
1051 },
1052 i18n_id: None,
1053 flow_id: op_id.to_string(),
1054 node_id: Some(op_id.to_string()),
1055 };
1056 pack_runtime
1057 .invoke_provider(&binding, exec_ctx, op_id, payload)
1058 .await
1059 })
1060 });
1061
1062 match result {
1063 Ok(value) => Ok(FlowOutcome {
1064 success: true,
1065 output: Some(value),
1066 raw: None,
1067 error: None,
1068 mode: RunnerExecutionMode::Exec,
1069 }),
1070 Err(err) => {
1071 let err_message = err.to_string();
1072 let needs_context = needs_secret_context(&err_message);
1073 let enriched_err = if needs_context {
1074 err.context(secret_error_context(ctx, provider_id, op_id, pack))
1075 } else {
1076 err
1077 };
1078 let error_text = if needs_context {
1079 enriched_err.to_string()
1080 } else {
1081 err_message
1082 };
1083 Ok(FlowOutcome {
1084 success: false,
1085 output: None,
1086 raw: None,
1087 error: Some(error_text),
1088 mode: RunnerExecutionMode::Exec,
1089 })
1090 }
1091 }
1092 }
1093}
1094
1095pub fn primary_provider_type(pack_path: &Path) -> anyhow::Result<String> {
1096 if let Ok(json_type) = primary_provider_type_from_json(pack_path) {
1099 return Ok(json_type);
1100 }
1101 let file = std::fs::File::open(pack_path)?;
1102 let mut archive = ZipArchive::new(file)?;
1103 let mut manifest_entry = archive.by_name("manifest.cbor").map_err(|err| {
1104 anyhow!(
1105 "failed to open manifest.cbor in {}: {err}",
1106 pack_path.display()
1107 )
1108 })?;
1109 let mut bytes = Vec::new();
1110 manifest_entry.read_to_end(&mut bytes)?;
1111 let manifest = decode_pack_manifest(&bytes)
1112 .context("failed to decode pack manifest for provider introspection")?;
1113 let inline = manifest.provider_extension_inline().ok_or_else(|| {
1114 anyhow!(
1115 "pack {} provider extension missing or not inline",
1116 pack_path.display()
1117 )
1118 })?;
1119 let provider = inline.providers.first().ok_or_else(|| {
1120 anyhow!(
1121 "pack {} provider extension contains no providers",
1122 pack_path.display()
1123 )
1124 })?;
1125 Ok(provider.provider_type.clone())
1126}
1127
1128fn primary_provider_type_from_json(pack_path: &Path) -> anyhow::Result<String> {
1130 let file = std::fs::File::open(pack_path)?;
1131 let mut archive = ZipArchive::new(file)?;
1132 let entry = archive
1133 .by_name("pack.manifest.json")
1134 .map_err(|_| anyhow!("pack.manifest.json not found in {}", pack_path.display()))?;
1135 let manifest: serde_json::Value = serde_json::from_reader(entry)?;
1136 let provider_type = manifest
1137 .pointer("/extensions/0/payload/providers/0/provider_type")
1138 .and_then(serde_json::Value::as_str)
1139 .ok_or_else(|| anyhow!("provider_type not found in pack.manifest.json"))?;
1140 Ok(provider_type.to_string())
1141}
1142
1143fn needs_secret_context(message: &str) -> bool {
1144 let lower = message.to_lowercase();
1145 lower.contains("secret store error") || message.contains("SecretsError")
1146}
1147
1148fn secret_error_context(
1149 ctx: &OperatorContext,
1150 provider_id: &str,
1151 op_id: &str,
1152 pack: &ProviderPack,
1153) -> String {
1154 let env = env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1155 let team = secrets_manager::canonical_team(ctx.team.as_deref()).into_owned();
1156 format!(
1157 "secret lookup context env={} tenant={} team={} provider={} flow={} pack_id={} pack_path={}",
1158 env,
1159 ctx.tenant,
1160 team,
1161 provider_id,
1162 op_id,
1163 pack.pack_id,
1164 pack.path.display()
1165 )
1166}
1167
1168fn json_to_canonical_cbor(value: &JsonValue) -> Option<Vec<u8>> {
1169 canonical::to_canonical_cbor_allow_floats(value).ok()
1170}
1171
1172fn decode_hook_response(value: &JsonValue) -> anyhow::Result<HookEvalResponse> {
1173 if let Some(cbor) = extract_cbor_blob(value)
1174 && let Ok(parsed) = serde_cbor::from_slice::<HookEvalResponse>(&cbor)
1175 {
1176 return Ok(parsed);
1177 }
1178 serde_json::from_value(value.clone())
1179 .map_err(|err| anyhow!("hook response is not valid cbor or legacy json: {err}"))
1180}
1181
1182fn extract_cbor_blob(value: &JsonValue) -> Option<Vec<u8>> {
1183 match value {
1184 JsonValue::Array(items) => items
1185 .iter()
1186 .map(|item| item.as_u64().and_then(|n| u8::try_from(n).ok()))
1187 .collect::<Option<Vec<u8>>>(),
1188 JsonValue::String(s) => general_purpose::STANDARD.decode(s).ok(),
1189 JsonValue::Object(map) => {
1190 for key in ["hook_decision_cbor_b64", "cbor_b64", "hook_decision_cbor"] {
1191 let Some(raw) = map.get(key) else {
1192 continue;
1193 };
1194 if let JsonValue::String(s) = raw
1195 && let Ok(bytes) = general_purpose::STANDARD.decode(s)
1196 {
1197 return Some(bytes);
1198 }
1199 if let Some(bytes) = extract_cbor_blob(raw) {
1200 return Some(bytes);
1201 }
1202 }
1203 None
1204 }
1205 _ => None,
1206 }
1207}
1208
1209fn missing_capability_outcome(
1210 cap_id: &str,
1211 op_name: &str,
1212 component_id: Option<&str>,
1213) -> FlowOutcome {
1214 FlowOutcome {
1215 success: false,
1216 output: Some(json!({
1217 "code": "missing_capability",
1218 "error": {
1219 "type": "MissingCapability",
1220 "cap_id": cap_id,
1221 "op_name": op_name,
1222 "component_id": component_id,
1223 }
1224 })),
1225 raw: None,
1226 error: Some(format!(
1227 "MissingCapability(cap_id={cap_id}, op_name={op_name}, component_id={})",
1228 component_id.unwrap_or("<unknown>")
1229 )),
1230 mode: RunnerExecutionMode::Exec,
1231 }
1232}
1233
1234fn capability_not_installed_outcome(cap_id: &str, op_name: &str, stable_id: &str) -> FlowOutcome {
1235 FlowOutcome {
1236 success: false,
1237 output: Some(json!({
1238 "code": "capability_not_installed",
1239 "error": {
1240 "type": "CapabilityNotInstalled",
1241 "cap_id": cap_id,
1242 "op_name": op_name,
1243 "stable_id": stable_id,
1244 }
1245 })),
1246 raw: None,
1247 error: Some(format!(
1248 "CapabilityNotInstalled(cap_id={cap_id}, op_name={op_name}, stable_id={stable_id})"
1249 )),
1250 mode: RunnerExecutionMode::Exec,
1251 }
1252}
1253
1254fn capability_route_error_outcome(cap_id: &str, op_name: &str, reason: String) -> FlowOutcome {
1255 FlowOutcome {
1256 success: false,
1257 output: Some(json!({
1258 "code": "capability_route_error",
1259 "error": {
1260 "type": "CapabilityRouteError",
1261 "cap_id": cap_id,
1262 "op_name": op_name,
1263 "reason": reason,
1264 }
1265 })),
1266 raw: None,
1267 error: Some(reason),
1268 mode: RunnerExecutionMode::Exec,
1269 }
1270}
1271
1272fn read_transcript_outputs(run_dir: &Path) -> anyhow::Result<Option<JsonValue>> {
1273 let path = run_dir.join("transcript.jsonl");
1274 if !path.exists() {
1275 return Ok(None);
1276 }
1277 let contents = std::fs::read_to_string(path)?;
1278 let mut last = None;
1279 for line in contents.lines() {
1280 let Ok(value) = serde_json::from_str::<JsonValue>(line) else {
1281 continue;
1282 };
1283 let Some(outputs) = value.get("outputs") else {
1284 continue;
1285 };
1286 if !outputs.is_null() {
1287 last = Some(outputs.clone());
1288 }
1289 }
1290 Ok(last)
1291}
1292
1293fn build_demo_host_config(tenant: &str) -> HostConfig {
1294 HostConfig {
1295 tenant: tenant.to_string(),
1296 bindings_path: PathBuf::from("<demo-provider>"),
1297 flow_type_bindings: HashMap::new(),
1298 rate_limits: RateLimits::default(),
1299 retry: FlowRetryConfig::default(),
1300 http_enabled: true,
1301 secrets_policy: SecretsPolicy::allow_all(),
1302 state_store_policy: StateStorePolicy::default(),
1303 webhook_policy: WebhookPolicy::default(),
1304 timers: Vec::new(),
1305 oauth: None,
1306 mocks: None,
1307 pack_bindings: Vec::new(),
1308 env_passthrough: Vec::new(),
1309 trace: TraceConfig::from_env(),
1310 validation: ValidationConfig::from_env(),
1311 operator_policy: OperatorPolicy::allow_all(),
1312 }
1313}
1314
1315fn validate_runner_binary(path: PathBuf) -> Option<PathBuf> {
1316 match fs::metadata(&path) {
1317 Ok(metadata) if metadata.is_file() && runner_binary_is_executable(&metadata) => Some(path),
1318 Ok(metadata) => {
1319 let reason = if !metadata.is_file() {
1320 "not a regular file"
1321 } else {
1322 "not executable"
1323 };
1324 operator_log::warn(
1325 module_path!(),
1326 format!(
1327 "runner binary '{}' is not usable ({})",
1328 path.display(),
1329 reason
1330 ),
1331 );
1332 None
1333 }
1334 Err(err) => {
1335 operator_log::warn(
1336 module_path!(),
1337 format!(
1338 "runner binary '{}' cannot be accessed: {}",
1339 path.display(),
1340 err
1341 ),
1342 );
1343 None
1344 }
1345 }
1346}
1347
1348fn pack_supports_provider_op(pack_path: &Path, op_id: &str) -> anyhow::Result<bool> {
1349 let file = std::fs::File::open(pack_path)?;
1350 let mut archive = ZipArchive::new(file)?;
1351 let mut manifest_entry = archive.by_name("manifest.cbor").map_err(|err| {
1352 anyhow!(
1353 "failed to open manifest.cbor in {}: {err}",
1354 pack_path.display()
1355 )
1356 })?;
1357 let mut bytes = Vec::new();
1358 manifest_entry.read_to_end(&mut bytes)?;
1359 let manifest = decode_pack_manifest(&bytes)
1360 .context("failed to decode pack manifest for op support introspection")?;
1361 if let Some(provider_ext) = manifest.provider_extension_inline() {
1362 if provider_ext
1363 .providers
1364 .iter()
1365 .any(|provider| provider.ops.iter().any(|op| op == op_id))
1366 {
1367 return Ok(true);
1368 }
1369 }
1370 if op_id == "ingest_http" {
1374 drop(manifest_entry);
1375 drop(bytes);
1376 let file2 = std::fs::File::open(pack_path)?;
1377 let archive2 = ZipArchive::new(file2)?;
1378 for i in 0..archive2.len() {
1379 if let Some(name) = archive2.name_for_index(i) {
1380 if name.ends_with(".wasm")
1381 && (name.contains("messaging-ingress-") || name.contains("messaging-provider-"))
1382 {
1383 return Ok(true);
1384 }
1385 }
1386 }
1387 }
1388 Ok(false)
1389}
1390
1391#[cfg(unix)]
1392fn runner_binary_is_executable(metadata: &fs::Metadata) -> bool {
1393 metadata.permissions().mode() & 0o111 != 0
1394}
1395
1396#[cfg(not(unix))]
1397fn runner_binary_is_executable(_: &fs::Metadata) -> bool {
1398 true
1399}
1400
1401fn payload_preview(bytes: &[u8]) -> String {
1402 const MAX_PREVIEW: usize = 256;
1403 if bytes.is_empty() {
1404 return "<empty>".to_string();
1405 }
1406 let preview_len = bytes.len().min(MAX_PREVIEW);
1407 if let Ok(text) = std::str::from_utf8(&bytes[..preview_len]) {
1408 if bytes.len() <= MAX_PREVIEW {
1409 text.to_string()
1410 } else {
1411 format!("{text}...")
1412 }
1413 } else {
1414 let encoded = general_purpose::STANDARD.encode(&bytes[..preview_len]);
1415 if bytes.len() <= MAX_PREVIEW {
1416 encoded
1417 } else {
1418 format!("{encoded}...")
1419 }
1420 }
1421}
1422
1423fn extract_token_validation_request(payload_bytes: &[u8]) -> Option<JsonValue> {
1424 let payload: JsonValue = serde_json::from_slice(payload_bytes).ok()?;
1425 let token = extract_bearer_token(&payload)?;
1426 let mut request = serde_json::Map::new();
1427 request.insert("token".to_string(), JsonValue::String(token));
1428 if let Some(issuer) = first_string_at_paths(
1429 &payload,
1430 &["/token_validation/issuer", "/auth/issuer", "/issuer"],
1431 ) {
1432 request.insert("issuer".to_string(), JsonValue::String(issuer));
1433 }
1434 if let Some(audience) = first_value_at_paths(
1435 &payload,
1436 &["/token_validation/audience", "/auth/audience", "/audience"],
1437 ) {
1438 request.insert("audience".to_string(), normalize_string_or_array(audience));
1439 }
1440 if let Some(scopes) = first_value_at_paths(
1441 &payload,
1442 &[
1443 "/token_validation/scopes",
1444 "/token_validation/required_scopes",
1445 "/auth/scopes",
1446 "/auth/required_scopes",
1447 "/scopes",
1448 ],
1449 ) {
1450 request.insert("scopes".to_string(), normalize_string_or_array(scopes));
1451 }
1452 Some(JsonValue::Object(request))
1453}
1454
1455fn extract_bearer_token(payload: &JsonValue) -> Option<String> {
1456 if let Some(value) = first_string_at_paths(
1457 payload,
1458 &[
1459 "/token_validation/token",
1460 "/auth/token",
1461 "/bearer_token",
1462 "/token",
1463 "/access_token",
1464 "/authorization",
1465 ],
1466 ) && let Some(token) = parse_bearer_value(&value)
1467 {
1468 return Some(token);
1469 }
1470
1471 if let Some(headers) = payload.get("headers")
1472 && let Some(token) = extract_bearer_from_headers(headers)
1473 {
1474 return Some(token);
1475 }
1476
1477 if let Some(value) = payload
1478 .pointer("/metadata/authorization")
1479 .and_then(JsonValue::as_str)
1480 && let Some(token) = parse_bearer_value(value)
1481 {
1482 return Some(token);
1483 }
1484
1485 None
1486}
1487
1488fn extract_bearer_from_headers(headers: &JsonValue) -> Option<String> {
1489 match headers {
1490 JsonValue::Object(map) => {
1491 for key in ["authorization", "Authorization"] {
1492 if let Some(value) = map.get(key).and_then(JsonValue::as_str)
1493 && let Some(token) = parse_bearer_value(value)
1494 {
1495 return Some(token);
1496 }
1497 }
1498 None
1499 }
1500 JsonValue::Array(values) => values.iter().find_map(|entry| {
1501 let name = entry
1502 .get("name")
1503 .or_else(|| entry.get("key"))
1504 .and_then(JsonValue::as_str)?;
1505 if !name.eq_ignore_ascii_case("authorization") {
1506 return None;
1507 }
1508 let value = entry.get("value").and_then(JsonValue::as_str)?;
1509 parse_bearer_value(value)
1510 }),
1511 _ => None,
1512 }
1513}
1514
1515fn parse_bearer_value(raw: &str) -> Option<String> {
1516 let trimmed = raw.trim();
1517 if trimmed.is_empty() {
1518 return None;
1519 }
1520 if let Some(rest) = trimmed.strip_prefix("Bearer ") {
1521 let token = rest.trim();
1522 if token.is_empty() {
1523 None
1524 } else {
1525 Some(token.to_string())
1526 }
1527 } else {
1528 Some(trimmed.to_string())
1529 }
1530}
1531
1532fn first_string_at_paths(payload: &JsonValue, paths: &[&str]) -> Option<String> {
1533 paths
1534 .iter()
1535 .find_map(|path| payload.pointer(path).and_then(JsonValue::as_str))
1536 .map(str::to_string)
1537}
1538
1539fn first_value_at_paths<'a>(payload: &'a JsonValue, paths: &[&str]) -> Option<&'a JsonValue> {
1540 paths.iter().find_map(|path| payload.pointer(path))
1541}
1542
1543fn normalize_string_or_array(value: &JsonValue) -> JsonValue {
1544 match value {
1545 JsonValue::String(raw) => {
1546 let values = raw
1547 .split_whitespace()
1548 .filter(|entry| !entry.trim().is_empty())
1549 .map(|entry| JsonValue::String(entry.to_string()))
1550 .collect::<Vec<_>>();
1551 JsonValue::Array(values)
1552 }
1553 JsonValue::Array(items) => JsonValue::Array(
1554 items
1555 .iter()
1556 .filter_map(|item| item.as_str())
1557 .map(|item| JsonValue::String(item.to_string()))
1558 .collect(),
1559 ),
1560 _ => JsonValue::Array(Vec::new()),
1561 }
1562}
1563
1564enum TokenValidationDecision {
1565 Allow(Option<JsonValue>),
1566 Deny(String),
1567}
1568
1569fn evaluate_token_validation_output(output: &JsonValue) -> TokenValidationDecision {
1570 let valid = output
1571 .get("valid")
1572 .and_then(JsonValue::as_bool)
1573 .or_else(|| output.get("ok").and_then(JsonValue::as_bool))
1574 .unwrap_or(false);
1575 if !valid {
1576 let reason = output
1577 .get("reason")
1578 .and_then(JsonValue::as_str)
1579 .or_else(|| output.get("error").and_then(JsonValue::as_str))
1580 .unwrap_or("invalid bearer token");
1581 return TokenValidationDecision::Deny(reason.to_string());
1582 }
1583 let claims = output
1584 .get("claims")
1585 .filter(|value| value.is_object())
1586 .cloned()
1587 .or_else(|| {
1588 output
1589 .as_object()
1590 .is_some_and(|map| map.contains_key("sub"))
1591 .then(|| output.clone())
1592 });
1593 TokenValidationDecision::Allow(claims)
1594}
1595
1596#[cfg(test)]
1597mod tests {
1598 use super::*;
1599 use serde_json::json;
1600
1601 #[test]
1602 fn token_validation_request_extracts_bearer_and_requirements() {
1603 let payload = json!({
1604 "headers": {
1605 "Authorization": "Bearer token-123"
1606 },
1607 "token_validation": {
1608 "issuer": "https://issuer.example",
1609 "audience": ["api://svc"],
1610 "required_scopes": "read write"
1611 }
1612 });
1613 let request =
1614 extract_token_validation_request(&serde_json::to_vec(&payload).expect("payload bytes"))
1615 .expect("request");
1616 assert_eq!(
1617 request.pointer("/token").and_then(JsonValue::as_str),
1618 Some("token-123")
1619 );
1620 assert_eq!(
1621 request.pointer("/issuer").and_then(JsonValue::as_str),
1622 Some("https://issuer.example")
1623 );
1624 assert_eq!(
1625 request.pointer("/audience/0").and_then(JsonValue::as_str),
1626 Some("api://svc")
1627 );
1628 assert_eq!(
1629 request.pointer("/scopes/0").and_then(JsonValue::as_str),
1630 Some("read")
1631 );
1632 assert_eq!(
1633 request.pointer("/scopes/1").and_then(JsonValue::as_str),
1634 Some("write")
1635 );
1636 }
1637
1638 #[test]
1639 fn token_validation_output_deny_when_invalid() {
1640 let output = json!({
1641 "valid": false,
1642 "reason": "issuer mismatch"
1643 });
1644 match evaluate_token_validation_output(&output) {
1645 TokenValidationDecision::Deny(reason) => {
1646 assert_eq!(reason, "issuer mismatch");
1647 }
1648 TokenValidationDecision::Allow(_) => panic!("expected deny"),
1649 }
1650 }
1651
1652 #[test]
1653 fn token_validation_output_allows_and_returns_claims() {
1654 let output = json!({
1655 "valid": true,
1656 "claims": {
1657 "sub": "user-1",
1658 "scope": "read write",
1659 "aud": ["api://svc"]
1660 }
1661 });
1662 match evaluate_token_validation_output(&output) {
1663 TokenValidationDecision::Allow(Some(claims)) => {
1664 assert_eq!(
1665 claims.pointer("/sub").and_then(JsonValue::as_str),
1666 Some("user-1")
1667 );
1668 }
1669 TokenValidationDecision::Allow(None) => panic!("expected claims"),
1670 TokenValidationDecision::Deny(reason) => panic!("unexpected deny: {reason}"),
1671 }
1672 }
1673}