Skip to main content

greentic_runner_host/
pack.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::cache::{ArtifactKey, CacheConfig, CacheManager, CpuPolicy, EngineProfile};
10use crate::component_api::{
11    self, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::identify_hint::IdentifyInstanceHint;
14use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
15use crate::provider::{ProviderBinding, ProviderRegistry};
16use crate::provider_core::{
17    schema_core::SchemaCorePre as LegacySchemaCorePre,
18    schema_core_path::SchemaCorePre as PathSchemaCorePre,
19    schema_core_schema::SchemaCorePre as SchemaSchemaCorePre,
20};
21use crate::provider_core_only;
22use crate::runtime_refs::RuntimeRefsInjection;
23use crate::runtime_wasmtime::{Component, Engine, InstancePre, Linker, ResourceTable};
24use anyhow::{Context, Result, anyhow, bail};
25use futures::executor::block_on;
26use greentic_distributor_client::dist::{
27    CachePolicy, DistClient, DistError, DistOptions, ResolvePolicy,
28};
29use greentic_interfaces_wasmtime::host_helpers::v1::{
30    self as host_v1, HostFns, add_all_v1_to_linker,
31    runner_host_http::RunnerHostHttp,
32    runner_host_kv::RunnerHostKv,
33    runtime_config::{ConfigError, RuntimeConfigHost},
34    secrets_store::{SecretsError, SecretsErrorV1_1, SecretsStoreHost, SecretsStoreHostV1_1},
35    state_store::{
36        OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
37        StateStoreHost, TenantCtx as StateTenantCtx,
38    },
39    telemetry_logger::{
40        OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
41        TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
42        TenantCtx as TelemetryTenantCtx,
43    },
44};
45use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::http::http_client as http_client_client_alias;
46use greentic_interfaces_wasmtime::instance_identity_instance_identity_describe_v0_1::InstanceIdentityDescribePre;
47use greentic_interfaces_wasmtime::instance_identity_v0_1::InstanceIdentityPre;
48use greentic_interfaces_wasmtime::{
49    http_client_client_v1_0::greentic::interfaces_types::types as http_types_v1_0,
50    http_client_client_v1_1::greentic::interfaces_types::types as http_types_v1_1,
51};
52use greentic_pack::builder as legacy_pack;
53use greentic_types::flow::FlowHasher;
54use greentic_types::{
55    ArtifactLocationV1, ComponentId, ComponentManifest, ComponentSourceRef, ComponentSourcesV1,
56    EXT_COMPONENT_SOURCES_V1, EnvId, ExtensionRef, Flow, FlowComponentRef, FlowId, FlowKind,
57    FlowMetadata, InputMapping, Node, NodeId, OutputMapping, Routing, StateKey as StoreStateKey,
58    TeamId, TelemetryHints, TenantCtx as TypesTenantCtx, TenantId, UserId, decode_pack_manifest,
59    pack_manifest::ExtensionInline,
60};
61use host_v1::http_client as host_http_client;
62use host_v1::http_client::{
63    HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
64    Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
65    RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
66    TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
67};
68use indexmap::IndexMap;
69use once_cell::sync::Lazy;
70use parking_lot::{Mutex, RwLock};
71use reqwest::blocking::Client as BlockingClient;
72use runner_core::normalize_under_root;
73use serde::{Deserialize, Serialize};
74use serde_cbor;
75use serde_json::{self, Value};
76use sha2::Digest;
77use tempfile::TempDir;
78use tokio::fs;
79use wasmparser::{Parser, Payload};
80use wasmtime::{Store, StoreContextMut};
81use wasmtime_wasi_http::WasiHttpCtx;
82use wasmtime_wasi_http::p2::{
83    WasiHttpCtxView, WasiHttpView, add_only_http_to_linker_sync as add_wasi_http_to_linker,
84};
85use wasmtime_wasi_tls::p2::LinkOptions;
86use wasmtime_wasi_tls::{WasiTlsCtx, WasiTlsCtxBuilder, WasiTlsCtxView, WasiTlsView};
87use zip::ZipArchive;
88
89use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
90use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
91use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
92#[cfg(feature = "fault-injection")]
93use crate::testing::fault_injection::{FaultContext, FaultPoint, maybe_fail};
94
95use crate::config::HostConfig;
96use crate::fault;
97use crate::secrets::{
98    DynSecretsManager, canonicalize_secret_key, read_secret_blocking, write_secret_blocking,
99};
100use crate::storage::state::STATE_PREFIX;
101use crate::storage::{DynSessionStore, DynStateStore};
102use crate::verify;
103use crate::wasi::{PreopenSpec, RunnerWasiPolicy};
104use tracing::warn;
105use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
106use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
107
108use greentic_flow::model::FlowDoc;
109
110#[allow(dead_code)]
111pub struct PackRuntime {
112    /// Component artifact path (wasm file).
113    path: PathBuf,
114    /// Optional archive (.gtpack) used to load flows/manifests.
115    archive_path: Option<PathBuf>,
116    config: Arc<HostConfig>,
117    engine: Engine,
118    metadata: PackMetadata,
119    manifest: Option<greentic_types::PackManifest>,
120    legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
121    component_manifests: HashMap<String, ComponentManifest>,
122    mocks: Option<Arc<MockLayer>>,
123    flows: Option<PackFlows>,
124    components: HashMap<String, PackComponent>,
125    http_client: Arc<BlockingClient>,
126    pre_cache: Mutex<HashMap<String, InstancePre<ComponentState>>>,
127    session_store: Option<DynSessionStore>,
128    state_store: Option<DynStateStore>,
129    wasi_policy: Arc<RunnerWasiPolicy>,
130    assets_tempdir: Option<TempDir>,
131    provider_registry: RwLock<Option<ProviderRegistry>>,
132    /// Per-revision lazy cache of `describe-identify-instance` results,
133    /// keyed by `component_ref`. `None` value means the component does not
134    /// export the describe world (or the hint was malformed) — the
135    /// caller falls back to passing input headers through unchanged. The
136    /// outer `Option` distinguishes "not probed yet" from "probed and
137    /// has no hint". `ArcSwap`-driven revision swaps allocate a fresh
138    /// `PackRuntime` so this cache is naturally invalidated.
139    identify_hint_cache: RwLock<HashMap<String, Option<IdentifyInstanceHint>>>,
140    secrets: DynSecretsManager,
141    oauth_config: Option<OAuthBrokerConfig>,
142    cache: CacheManager,
143    /// `pack-config.v1.non_secret` map plumbed into each `HostState` for the
144    /// `greentic:runtime-config@1.0.0` host import. Defaults to `None` when no
145    /// producer (greentic-start) has materialized a `PackConfig` yet; in that
146    /// case all runtime-config lookups fall through to the secrets-store
147    /// compat shim.
148    runtime_config_non_secret: Option<Arc<BTreeMap<String, Value>>>,
149    /// `pack-config.v1.runtime_refs` (C5): per-pack `key → URI` bindings plus
150    /// the env-shared [`RuntimeRefResolver`]. Consulted by the
151    /// `greentic:runtime-config@1.0.0` host import AFTER `non_secret` and
152    /// BEFORE the compat shim. `None` when no producer set it yet.
153    ///
154    /// [`RuntimeRefResolver`]: crate::runtime_refs::RuntimeRefResolver
155    runtime_refs: Option<RuntimeRefsInjection>,
156}
157
158struct PackComponent {
159    #[allow(dead_code)]
160    name: String,
161    #[allow(dead_code)]
162    version: String,
163    component: Arc<Component>,
164}
165
166/// Outcome of calling a provider component's `identify-instance` export
167/// (`greentic:provider-instance-identity@0.1.0`). Callers MUST treat the
168/// three variants differently per the WIT contract.
169#[derive(Debug, Clone, PartialEq, Eq)]
170pub enum IdentifyOutcome {
171    /// Component does not export the world — caller falls back to the
172    /// operator's statically-declared `provider_id`.
173    Unsupported,
174    /// Component exported the world and returned `None` — caller MUST
175    /// fail closed (401/404), no fallback.
176    NoMatch,
177    /// Component identified the payload as belonging to this
178    /// `provider_id` — caller routes to the matching `MessagingEndpoint`.
179    Identified(String),
180}
181
182impl IdentifyOutcome {
183    /// Merge `other` into `self` per the lattice
184    /// `Identified > NoMatch > Unsupported`. Used by callers fanning the probe
185    /// out over multiple packs (overlays) where the strongest signal across
186    /// packs wins.
187    pub fn merge_in(&mut self, other: IdentifyOutcome) {
188        match (&*self, &other) {
189            // Identified is the top — never gets overwritten.
190            (IdentifyOutcome::Identified(_), _) => {}
191            // Promote to Identified from anything else.
192            (_, IdentifyOutcome::Identified(_)) => *self = other,
193            // NoMatch promotes Unsupported but cannot downgrade itself.
194            (IdentifyOutcome::Unsupported, IdentifyOutcome::NoMatch) => *self = other,
195            _ => {}
196        }
197    }
198}
199
200fn run_on_wasi_thread<F, T>(task_name: &'static str, task: F) -> Result<T>
201where
202    F: FnOnce() -> Result<T> + Send + 'static,
203    T: Send + 'static,
204{
205    let builder = std::thread::Builder::new().name(format!("greentic-wasmtime-{task_name}"));
206    let handle = builder
207        .spawn(move || {
208            let pid = std::process::id();
209            let thread_id = std::thread::current().id();
210            let tokio_handle_present = tokio::runtime::Handle::try_current().is_ok();
211            tracing::info!(
212                event = "wasmtime.thread.start",
213                task = task_name,
214                pid,
215                thread_id = ?thread_id,
216                tokio_handle_present,
217                "starting Wasmtime thread"
218            );
219            task()
220        })
221        .context("failed to spawn Wasmtime thread")?;
222    handle
223        .join()
224        .map_err(|err| {
225            let reason = if let Some(msg) = err.downcast_ref::<&str>() {
226                msg.to_string()
227            } else if let Some(msg) = err.downcast_ref::<String>() {
228                msg.clone()
229            } else {
230                "unknown panic".to_string()
231            };
232            anyhow!("Wasmtime thread panicked: {reason}")
233        })
234        .and_then(|res| res)
235}
236
237#[derive(Debug, Default, Clone)]
238pub struct ComponentResolution {
239    /// Root of a materialized pack directory containing `manifest.cbor` and `components/`.
240    pub materialized_root: Option<PathBuf>,
241    /// Explicit overrides mapping component id -> wasm path.
242    pub overrides: HashMap<String, PathBuf>,
243    /// If true, do not fetch remote components; require cached artifacts.
244    pub dist_offline: bool,
245    /// Optional cache directory for resolved remote components.
246    pub dist_cache_dir: Option<PathBuf>,
247    /// Allow bundled components without wasm_sha256 (dev-only escape hatch).
248    pub allow_missing_hash: bool,
249}
250
251fn build_blocking_client() -> BlockingClient {
252    std::thread::spawn(|| {
253        BlockingClient::builder()
254            .no_proxy()
255            .build()
256            .expect("blocking client")
257    })
258    .join()
259    .expect("client build thread panicked")
260}
261
262fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
263    let (root, candidate) = if path.is_absolute() {
264        let parent = path
265            .parent()
266            .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
267        let root = parent
268            .canonicalize()
269            .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
270        let file = path
271            .file_name()
272            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
273        (root, PathBuf::from(file))
274    } else {
275        let cwd = std::env::current_dir().context("failed to resolve current directory")?;
276        let base = if let Some(parent) = path.parent() {
277            cwd.join(parent)
278        } else {
279            cwd
280        };
281        let root = base
282            .canonicalize()
283            .with_context(|| format!("failed to canonicalize {}", base.display()))?;
284        let file = path
285            .file_name()
286            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
287        (root, PathBuf::from(file))
288    };
289    let safe = normalize_under_root(&root, &candidate)?;
290    Ok((root, safe))
291}
292
293static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
294
295#[derive(Debug, Clone, Serialize, Deserialize)]
296pub struct FlowDescriptor {
297    pub id: String,
298    #[serde(rename = "type")]
299    pub flow_type: String,
300    pub pack_id: String,
301    pub profile: String,
302    pub version: String,
303    #[serde(default)]
304    pub description: Option<String>,
305}
306
307pub struct HostState {
308    #[allow(dead_code)]
309    pack_id: String,
310    config: Arc<HostConfig>,
311    http_client: Arc<BlockingClient>,
312    default_env: String,
313    #[allow(dead_code)]
314    session_store: Option<DynSessionStore>,
315    state_store: Option<DynStateStore>,
316    mocks: Option<Arc<MockLayer>>,
317    secrets: DynSecretsManager,
318    oauth_config: Option<OAuthBrokerConfig>,
319    oauth_host: OAuthBrokerHost,
320    exec_ctx: Option<ComponentExecCtx>,
321    component_ref: Option<String>,
322    provider_core_component: bool,
323    /// `pack-config.v1.non_secret` map for the `greentic:runtime-config@1.0.0`
324    /// host import. Populated by the producer (greentic-start) from the
325    /// deployed `PackConfig`; `None` when no PackConfig was published, in
326    /// which case lookups fall back to the secrets-store compat shim with
327    /// a once-per-process deprecation warning.
328    runtime_config_non_secret: Option<Arc<BTreeMap<String, Value>>>,
329    /// `pack-config.v1.runtime_refs` (C5) injection: per-pack `key → URI`
330    /// bindings plus the env-shared resolver. The host import resolves the
331    /// URI on every call so the value tracks `runtime.json` hot-reloads.
332    runtime_refs: Option<RuntimeRefsInjection>,
333}
334
335impl HostState {
336    #[allow(clippy::default_constructed_unit_structs)]
337    #[allow(clippy::too_many_arguments)]
338    pub fn new(
339        pack_id: String,
340        config: Arc<HostConfig>,
341        http_client: Arc<BlockingClient>,
342        mocks: Option<Arc<MockLayer>>,
343        session_store: Option<DynSessionStore>,
344        state_store: Option<DynStateStore>,
345        secrets: DynSecretsManager,
346        oauth_config: Option<OAuthBrokerConfig>,
347        exec_ctx: Option<ComponentExecCtx>,
348        component_ref: Option<String>,
349        provider_core_component: bool,
350        runtime_config_non_secret: Option<Arc<BTreeMap<String, Value>>>,
351        runtime_refs: Option<RuntimeRefsInjection>,
352    ) -> Result<Self> {
353        let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
354        Ok(Self {
355            pack_id,
356            config,
357            http_client,
358            default_env,
359            session_store,
360            state_store,
361            mocks,
362            secrets,
363            oauth_config,
364            oauth_host: OAuthBrokerHost::default(),
365            exec_ctx,
366            component_ref,
367            provider_core_component,
368            runtime_config_non_secret,
369            runtime_refs,
370        })
371    }
372
373    fn instantiate_component_result(
374        linker: &mut Linker<ComponentState>,
375        store: &mut Store<ComponentState>,
376        component: &Component,
377        ctx: &ComponentExecCtx,
378        component_ref: &str,
379        operation: &str,
380        input_json: &str,
381    ) -> Result<InvokeResult> {
382        let pre_instance = linker.instantiate_pre(component)?;
383        match component_api::v0_6::ComponentPre::new(pre_instance) {
384            Ok(pre) => {
385                let envelope = component_api::envelope_v0_6(ctx, component_ref, input_json)?;
386                let operation_owned = operation.to_string();
387                let result = block_on(async {
388                    let bindings = pre.instantiate_async(&mut *store).await?;
389                    let node = bindings.greentic_component_node();
390                    node.call_invoke(&mut *store, &operation_owned, &envelope)
391                })?;
392                component_api::invoke_result_from_v0_6(result)
393            }
394            Err(err_v06) => {
395                if !is_missing_node_export(&err_v06, "0.6.0") {
396                    return Err(err_v06.into());
397                }
398                let pre_instance = linker.instantiate_pre(component)?;
399                match component_api::v0_5::ComponentPre::new(pre_instance) {
400                    Ok(pre) => {
401                        let result = block_on(async {
402                            let bindings = pre.instantiate_async(&mut *store).await?;
403                            let node = bindings.greentic_component_node();
404                            let ctx_v05 = component_api::exec_ctx_v0_5(ctx);
405                            let operation_owned = operation.to_string();
406                            let input_owned = input_json.to_string();
407                            node.call_invoke(&mut *store, &ctx_v05, &operation_owned, &input_owned)
408                        })?;
409                        Ok(component_api::invoke_result_from_v0_5(result))
410                    }
411                    Err(err) => {
412                        if !is_missing_node_export(&err, "0.5.0") {
413                            return Err(err.into());
414                        }
415                        let pre_instance = linker.instantiate_pre(component)?;
416                        match component_api::v0_4::ComponentPre::new(pre_instance) {
417                            Ok(pre) => {
418                                let result = block_on(async {
419                                    let bindings = pre.instantiate_async(&mut *store).await?;
420                                    let node = bindings.greentic_component_node();
421                                    let ctx_v04 = component_api::exec_ctx_v0_4(ctx);
422                                    let operation_owned = operation.to_string();
423                                    let input_owned = input_json.to_string();
424                                    node.call_invoke(
425                                        &mut *store,
426                                        &ctx_v04,
427                                        &operation_owned,
428                                        &input_owned,
429                                    )
430                                })?;
431                                Ok(component_api::invoke_result_from_v0_4(result))
432                            }
433                            Err(err_v04) => {
434                                if is_missing_node_export(&err_v04, "0.4.0") {
435                                    Self::try_v06_runtime(linker, store, component, input_json)
436                                } else {
437                                    Err(err_v04.into())
438                                }
439                            }
440                        }
441                    }
442                }
443            }
444        }
445    }
446
447    /// Fallback for v0.6 components that export `component-runtime::run(input, state)`
448    /// instead of the legacy `node::invoke(ctx, op, input)`.
449    fn try_v06_runtime(
450        linker: &mut Linker<ComponentState>,
451        store: &mut Store<ComponentState>,
452        component: &Component,
453        input_json: &str,
454    ) -> Result<InvokeResult> {
455        let pre_instance = linker.instantiate_pre(component)?;
456        let pre = component_api::v0_6_runtime::ComponentV0V6RuntimePre::new(pre_instance).map_err(
457            |err| err.context("component exports neither node@0.5/0.4 nor component-runtime@0.6"),
458        )?;
459
460        let result = block_on(async {
461            let bindings = pre.instantiate_async(&mut *store).await?;
462            let runtime = bindings.greentic_component_component_runtime();
463
464            // Encode input as CBOR — the component's run() expects CBOR bytes.
465            let input_value: Value = serde_json::from_str(input_json).unwrap_or(Value::Null);
466            let input_cbor =
467                serde_cbor::to_vec(&input_value).context("encode input as CBOR for v0.6")?;
468            let empty_state = serde_cbor::to_vec(&Value::Object(Default::default()))
469                .context("encode empty state")?;
470
471            let run_result = runtime
472                .call_run(&mut *store, &input_cbor, &empty_state)
473                .map_err(|err| err.context("v0.6 component-runtime::run call failed"))?;
474
475            // Decode output CBOR to JSON.
476            let output_value: Value = serde_cbor::from_slice(&run_result.output)
477                .context("decode v0.6 run output CBOR")?;
478            let output_json = serde_json::to_string(&output_value)
479                .context("serialize v0.6 run output to JSON")?;
480
481            Ok::<_, anyhow::Error>(output_json)
482        })?;
483
484        Ok(InvokeResult::Ok(result))
485    }
486
487    fn convert_invoke_result(result: InvokeResult) -> Result<Value> {
488        match result {
489            InvokeResult::Ok(body) => {
490                if body.is_empty() {
491                    return Ok(Value::Null);
492                }
493                serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
494            }
495            InvokeResult::Err(NodeError {
496                code,
497                message,
498                retryable,
499                backoff_ms,
500                details,
501            }) => {
502                let mut obj = serde_json::Map::new();
503                obj.insert("ok".into(), Value::Bool(false));
504                let mut error = serde_json::Map::new();
505                error.insert("code".into(), Value::String(code));
506                error.insert("message".into(), Value::String(message));
507                error.insert("retryable".into(), Value::Bool(retryable));
508                if let Some(backoff) = backoff_ms {
509                    error.insert("backoff_ms".into(), Value::Number(backoff.into()));
510                }
511                if let Some(details) = details {
512                    error.insert(
513                        "details".into(),
514                        serde_json::from_str(&details).unwrap_or(Value::String(details)),
515                    );
516                }
517                obj.insert("error".into(), Value::Object(error));
518                Ok(Value::Object(obj))
519            }
520        }
521    }
522
523    /// Build a `TenantCtx` for secrets lookups that includes the team from the
524    /// execution context. `config.tenant_ctx()` only populates env + tenant;
525    /// without this, secrets scoped to a specific team are unreachable.
526    fn secrets_tenant_ctx(&self) -> TypesTenantCtx {
527        let mut ctx = self.config.tenant_ctx();
528        if let Some(exec_ctx) = self.exec_ctx.as_ref()
529            && let Some(team) = exec_ctx.tenant.team.as_ref()
530            && let Ok(team_id) = TeamId::from_str(team)
531        {
532            ctx = ctx.with_team(Some(team_id));
533        }
534        ctx
535    }
536
537    pub fn get_secret(&self, key: &str) -> Result<String> {
538        if provider_core_only::is_enabled() {
539            bail!(provider_core_only::blocked_message("secrets"))
540        }
541        if !self.config.secrets_policy.is_allowed(key) {
542            bail!("secret {key} is not permitted by bindings policy");
543        }
544        if let Some(mock) = &self.mocks
545            && let Some(value) = mock.secrets_lookup(key)
546        {
547            return Ok(value);
548        }
549        let ctx = self.secrets_tenant_ctx();
550        let canonical_key = canonicalize_secret_key(key);
551        let bytes = read_secret_blocking(&self.secrets, &ctx, &self.pack_id, &canonical_key)
552            .context("failed to read secret from manager")?;
553        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
554        Ok(value)
555    }
556
557    fn allows_secret_write_in_provider_core_only(&self) -> bool {
558        self.provider_core_component || self.component_ref.is_none()
559    }
560
561    fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
562        let tenant_raw = ctx
563            .as_ref()
564            .map(|ctx| ctx.tenant.clone())
565            .or_else(|| self.exec_ctx.as_ref().map(|ctx| ctx.tenant.tenant.clone()))
566            .unwrap_or_else(|| self.config.tenant.clone());
567        let env_raw = ctx
568            .as_ref()
569            .map(|ctx| ctx.env.clone())
570            .unwrap_or_else(|| self.default_env.clone());
571        let tenant_id = TenantId::from_str(&tenant_raw)
572            .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
573        let env_id = EnvId::from_str(&env_raw)
574            .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
575        let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
576        if let Some(exec_ctx) = self.exec_ctx.as_ref() {
577            if let Some(team) = exec_ctx.tenant.team.as_ref() {
578                let team_id =
579                    TeamId::from_str(team).with_context(|| format!("invalid team id `{team}`"))?;
580                tenant_ctx = tenant_ctx.with_team(Some(team_id));
581            }
582            if let Some(user) = exec_ctx.tenant.user.as_ref() {
583                let user_id =
584                    UserId::from_str(user).with_context(|| format!("invalid user id `{user}`"))?;
585                tenant_ctx = tenant_ctx.with_user(Some(user_id));
586            }
587            tenant_ctx = tenant_ctx.with_flow(exec_ctx.flow_id.clone());
588            if let Some(node) = exec_ctx.node_id.as_ref() {
589                tenant_ctx = tenant_ctx.with_node(node.clone());
590            }
591            if let Some(session) = exec_ctx.tenant.correlation_id.as_ref() {
592                tenant_ctx = tenant_ctx.with_session(session.clone());
593            }
594            tenant_ctx.trace_id = exec_ctx.tenant.trace_id.clone();
595        }
596
597        if let Some(ctx) = ctx {
598            if let Some(team) = ctx.team.or(ctx.team_id) {
599                let team_id =
600                    TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
601                tenant_ctx = tenant_ctx.with_team(Some(team_id));
602            }
603            if let Some(user) = ctx.user.or(ctx.user_id) {
604                let user_id =
605                    UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
606                tenant_ctx = tenant_ctx.with_user(Some(user_id));
607            }
608            if let Some(flow) = ctx.flow_id {
609                tenant_ctx = tenant_ctx.with_flow(flow);
610            }
611            if let Some(node) = ctx.node_id {
612                tenant_ctx = tenant_ctx.with_node(node);
613            }
614            if let Some(provider) = ctx.provider_id {
615                tenant_ctx = tenant_ctx.with_provider(provider);
616            }
617            if let Some(session) = ctx.session_id {
618                tenant_ctx = tenant_ctx.with_session(session);
619            }
620            tenant_ctx.trace_id = ctx.trace_id;
621        }
622        Ok(tenant_ctx)
623    }
624
625    fn send_http_request(
626        &mut self,
627        req: HttpRequest,
628        opts: Option<HttpRequestOptionsV1_1>,
629        _ctx: Option<HttpTenantCtx>,
630    ) -> Result<HttpResponse, HttpClientError> {
631        if !self.config.http_enabled {
632            return Err(HttpClientError {
633                code: "denied".into(),
634                message: "http client disabled by policy".into(),
635            });
636        }
637
638        let mut mock_state = None;
639        let raw_body = req.body.clone();
640        if let Some(mock) = &self.mocks
641            && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
642        {
643            match mock.http_begin(&meta) {
644                HttpDecision::Mock(response) => {
645                    let headers = response
646                        .headers
647                        .iter()
648                        .map(|(k, v)| (k.clone(), v.clone()))
649                        .collect();
650                    return Ok(HttpResponse {
651                        status: response.status,
652                        headers,
653                        body: response.body.clone().map(|b| b.into_bytes()),
654                    });
655                }
656                HttpDecision::Deny(reason) => {
657                    return Err(HttpClientError {
658                        code: "denied".into(),
659                        message: reason,
660                    });
661                }
662                HttpDecision::Passthrough { record } => {
663                    mock_state = Some((meta, record));
664                }
665            }
666        }
667
668        let method = req.method.parse().unwrap_or(reqwest::Method::GET);
669        let mut builder = self.http_client.request(method, &req.url);
670        for (key, value) in req.headers {
671            if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
672                && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
673            {
674                builder = builder.header(header, header_value);
675            }
676        }
677
678        if let Some(body) = raw_body.clone() {
679            builder = builder.body(body);
680        }
681
682        if let Some(opts) = opts {
683            if let Some(timeout_ms) = opts.timeout_ms {
684                builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
685            }
686            if opts.allow_insecure == Some(true) {
687                warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
688            }
689            if let Some(follow_redirects) = opts.follow_redirects
690                && !follow_redirects
691            {
692                warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
693            }
694        }
695
696        let response = match builder.send() {
697            Ok(resp) => resp,
698            Err(err) => {
699                warn!(url = %req.url, error = %err, "http client request failed");
700                return Err(HttpClientError {
701                    code: "unavailable".into(),
702                    message: err.to_string(),
703                });
704            }
705        };
706
707        let status = response.status().as_u16();
708        let headers_vec = response
709            .headers()
710            .iter()
711            .map(|(k, v)| {
712                (
713                    k.as_str().to_string(),
714                    v.to_str().unwrap_or_default().to_string(),
715                )
716            })
717            .collect::<Vec<_>>();
718        let body_bytes = response.bytes().ok().map(|b| b.to_vec());
719
720        if let Some((meta, true)) = mock_state.take()
721            && let Some(mock) = &self.mocks
722        {
723            let recorded = HttpMockResponse::new(
724                status,
725                headers_vec.clone().into_iter().collect(),
726                body_bytes
727                    .as_ref()
728                    .map(|b| String::from_utf8_lossy(b).into_owned()),
729            );
730            mock.http_record(&meta, &recorded);
731        }
732
733        Ok(HttpResponse {
734            status,
735            headers: headers_vec,
736            body: body_bytes,
737        })
738    }
739}
740
741#[cfg(test)]
742mod canonicalize_tests {
743    use crate::secrets::canonicalize_secret_key;
744
745    #[test]
746    fn upper_snake_to_lower_snake() {
747        assert_eq!(
748            canonicalize_secret_key("TELEGRAM_BOT_TOKEN"),
749            "telegram_bot_token"
750        );
751    }
752
753    #[test]
754    fn trim_and_replace_non_alphanumeric() {
755        assert_eq!(
756            canonicalize_secret_key("  webex-bot-token  "),
757            "webex_bot_token"
758        );
759    }
760
761    #[test]
762    fn preserve_existing_lower_snake_with_extra_underscores() {
763        assert_eq!(canonicalize_secret_key("MiXeD__Case"), "mixed__case");
764    }
765}
766
767impl SecretsStoreHost for HostState {
768    fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
769        if provider_core_only::is_enabled() {
770            warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
771            return Err(SecretsError::Denied);
772        }
773        if !self.config.secrets_policy.is_allowed(&key) {
774            return Err(SecretsError::Denied);
775        }
776        if let Some(mock) = &self.mocks
777            && let Some(value) = mock.secrets_lookup(&key)
778        {
779            return Ok(Some(value.into_bytes()));
780        }
781        let ctx = self.secrets_tenant_ctx();
782        let canonical_key = canonicalize_secret_key(&key);
783        match read_secret_blocking(&self.secrets, &ctx, &self.pack_id, &canonical_key) {
784            Ok(bytes) => Ok(Some(bytes)),
785            Err(err) => {
786                warn!(secret = %key, canonical = %canonical_key, error = %err, "secret lookup failed");
787                Err(SecretsError::NotFound)
788            }
789        }
790    }
791}
792
793impl SecretsStoreHostV1_1 for HostState {
794    fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsErrorV1_1> {
795        if provider_core_only::is_enabled() {
796            warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
797            return Err(SecretsErrorV1_1::Denied);
798        }
799        if !self.config.secrets_policy.is_allowed(&key) {
800            return Err(SecretsErrorV1_1::Denied);
801        }
802        if let Some(mock) = &self.mocks
803            && let Some(value) = mock.secrets_lookup(&key)
804        {
805            return Ok(Some(value.into_bytes()));
806        }
807        let ctx = self.secrets_tenant_ctx();
808        let canonical_key = canonicalize_secret_key(&key);
809        match read_secret_blocking(&self.secrets, &ctx, &self.pack_id, &canonical_key) {
810            Ok(bytes) => Ok(Some(bytes)),
811            Err(err) => {
812                warn!(secret = %key, canonical = %canonical_key, error = %err, "secret lookup failed");
813                Err(SecretsErrorV1_1::NotFound)
814            }
815        }
816    }
817
818    fn put(&mut self, key: String, value: Vec<u8>) {
819        if key.trim().is_empty() {
820            warn!(secret = %key, "secret write blocked: empty key");
821            panic!("secret write denied for key {key}: invalid key");
822        }
823        if provider_core_only::is_enabled() && !self.allows_secret_write_in_provider_core_only() {
824            warn!(
825                secret = %key,
826                component = self.component_ref.as_deref().unwrap_or("<pack>"),
827                "provider-core only mode enabled; blocking secrets store write"
828            );
829            panic!("secret write denied for key {key}: provider-core-only mode");
830        }
831        if !self.config.secrets_policy.is_allowed(&key) {
832            warn!(secret = %key, "secret write denied by bindings policy");
833            panic!("secret write denied for key {key}: policy");
834        }
835        let ctx = self.secrets_tenant_ctx();
836        let canonical_key = canonicalize_secret_key(&key);
837        if let Err(err) =
838            write_secret_blocking(&self.secrets, &ctx, &self.pack_id, &canonical_key, &value)
839        {
840            warn!(secret = %key, canonical = %canonical_key, error = %err, "secret write failed");
841            panic!("secret write failed for key {key}");
842        }
843    }
844}
845
846/// Process-global set of `pack-config.v1` keys for which the compat shim has
847/// already logged a deprecation warning. Used to debounce once-per-process
848/// per-key so resolving the same legacy key from many invocations does not
849/// spam the log.
850static WARNED_COMPAT_KEYS: Lazy<Mutex<HashSet<String>>> = Lazy::new(|| Mutex::new(HashSet::new()));
851
852fn warn_compat_fallback_once(key: &str) {
853    let mut warned = WARNED_COMPAT_KEYS.lock();
854    if warned.insert(key.to_string()) {
855        warn!(
856            key = %key,
857            "runtime-config key resolved via secrets-store compat fallback; \
858             move this value into pack-config.v1.non_secret"
859        );
860    }
861}
862
863impl RuntimeConfigHost for HostState {
864    fn get(&mut self, key: String) -> Result<Option<String>, ConfigError> {
865        if key.trim().is_empty() {
866            return Err(ConfigError::InvalidKey);
867        }
868
869        // 1) Primary channel: pack-config.v1.non_secret. Values are stored as
870        //    `serde_json::Value`; the WIT contract returns UTF-8 strings
871        //    conventionally JSON-encoded, so stringify here.
872        if let Some(map) = self.runtime_config_non_secret.as_ref()
873            && let Some(value) = map.get(&key)
874        {
875            return serde_json::to_string(value).map(Some).map_err(|err| {
876                warn!(key = %key, error = %err, "runtime-config value JSON-encode failed");
877                ConfigError::Internal
878            });
879        }
880
881        // 1b) C5 channel: pack-config.v1.runtime_refs. Resolved on every call
882        //     so values track `runtime.json` hot-reloads. The per-pack `refs`
883        //     map gates which keys this channel claims; non-bound keys fall
884        //     through to the compat shim.
885        if let Some(injection) = self.runtime_refs.as_ref()
886            && let Some(uri) = injection.refs.get(&key)
887        {
888            use crate::runtime_refs::RuntimeRefResolverError;
889            return match injection.resolver.resolve(uri) {
890                Ok(Some(value)) => serde_json::to_string(&value).map(Some).map_err(|err| {
891                    warn!(key = %key, error = %err, "runtime-ref value JSON-encode failed");
892                    ConfigError::Internal
893                }),
894                Ok(None) => Ok(None),
895                Err(err @ RuntimeRefResolverError::Invalid(_)) => {
896                    warn!(key = %key, error = %err, "runtime-ref rejected");
897                    Err(ConfigError::InvalidKey)
898                }
899                Err(err @ RuntimeRefResolverError::Internal(_)) => {
900                    warn!(key = %key, error = %err, "runtime-ref resolution failed");
901                    Err(ConfigError::Internal)
902                }
903            };
904        }
905
906        // 2) Compat fallback: try the secrets-store. Warn once per key per
907        //    process so this stays visible without spamming the log.
908        match SecretsStoreHost::get(self, key.clone()) {
909            Ok(Some(bytes)) => match String::from_utf8(bytes) {
910                Ok(value) => {
911                    warn_compat_fallback_once(&key);
912                    Ok(Some(value))
913                }
914                Err(_) => {
915                    warn!(
916                        key = %key,
917                        "runtime-config compat fallback found non-UTF-8 secret bytes; \
918                         returning not-found"
919                    );
920                    Err(ConfigError::Internal)
921                }
922            },
923            Ok(None) => Ok(None),
924            Err(SecretsError::NotFound) => Ok(None),
925            Err(SecretsError::Denied) => Err(ConfigError::Denied),
926            Err(SecretsError::InvalidKey) => Err(ConfigError::InvalidKey),
927            Err(SecretsError::Internal) => Err(ConfigError::Internal),
928        }
929    }
930}
931
932impl HttpClientHost for HostState {
933    fn send(
934        &mut self,
935        req: HttpRequest,
936        ctx: Option<HttpTenantCtx>,
937    ) -> Result<HttpResponse, HttpClientError> {
938        self.send_http_request(req, None, ctx)
939    }
940}
941
942impl HttpClientHostV1_1 for HostState {
943    fn send(
944        &mut self,
945        req: HttpRequestV1_1,
946        opts: Option<HttpRequestOptionsV1_1>,
947        ctx: Option<HttpTenantCtxV1_1>,
948    ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
949        let legacy_req = HttpRequest {
950            method: req.method,
951            url: req.url,
952            headers: req.headers,
953            body: req.body,
954        };
955        let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
956            env: ctx.env,
957            tenant: ctx.tenant,
958            tenant_id: ctx.tenant_id,
959            team: ctx.team,
960            team_id: ctx.team_id,
961            user: ctx.user,
962            user_id: ctx.user_id,
963            trace_id: ctx.trace_id,
964            correlation_id: ctx.correlation_id,
965            i18n_id: ctx.i18n_id,
966            attributes: ctx.attributes,
967            session_id: ctx.session_id,
968            flow_id: ctx.flow_id,
969            node_id: ctx.node_id,
970            provider_id: ctx.provider_id,
971            deadline_ms: ctx.deadline_ms,
972            attempt: ctx.attempt,
973            idempotency_key: ctx.idempotency_key,
974            impersonation: ctx.impersonation.map(|imp| http_types_v1_0::Impersonation {
975                actor_id: imp.actor_id,
976                reason: imp.reason,
977            }),
978        });
979
980        self.send_http_request(legacy_req, opts, legacy_ctx)
981            .map(|resp| HttpResponseV1_1 {
982                status: resp.status,
983                headers: resp.headers,
984                body: resp.body,
985            })
986            .map_err(|err| HttpClientErrorV1_1 {
987                code: err.code,
988                message: err.message,
989            })
990    }
991}
992
993impl StateStoreHost for HostState {
994    fn read(
995        &mut self,
996        key: HostStateKey,
997        ctx: Option<StateTenantCtx>,
998    ) -> Result<Vec<u8>, StateError> {
999        let store = match self.state_store.as_ref() {
1000            Some(store) => store.clone(),
1001            None => {
1002                return Err(StateError {
1003                    code: "unavailable".into(),
1004                    message: "state store not configured".into(),
1005                });
1006            }
1007        };
1008        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
1009            Ok(ctx) => ctx,
1010            Err(err) => {
1011                return Err(StateError {
1012                    code: "invalid-ctx".into(),
1013                    message: err.to_string(),
1014                });
1015            }
1016        };
1017        #[cfg(feature = "fault-injection")]
1018        {
1019            let exec_ctx = self.exec_ctx.as_ref();
1020            let flow_id = exec_ctx
1021                .map(|ctx| ctx.flow_id.as_str())
1022                .unwrap_or("unknown");
1023            let node_id = exec_ctx.and_then(|ctx| ctx.node_id.as_deref());
1024            let attempt = exec_ctx.map(|ctx| ctx.tenant.attempt).unwrap_or(1);
1025            let fault_ctx = FaultContext {
1026                pack_id: self.pack_id.as_str(),
1027                flow_id,
1028                node_id,
1029                attempt,
1030            };
1031            if let Err(err) = maybe_fail(FaultPoint::StateRead, fault_ctx) {
1032                return Err(StateError {
1033                    code: "internal".into(),
1034                    message: err.to_string(),
1035                });
1036            }
1037        }
1038        let key = StoreStateKey::from(key);
1039        match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
1040            Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
1041            Ok(None) => Err(StateError {
1042                code: "not_found".into(),
1043                message: "state key not found".into(),
1044            }),
1045            Err(err) => Err(StateError {
1046                code: "internal".into(),
1047                message: err.to_string(),
1048            }),
1049        }
1050    }
1051
1052    fn write(
1053        &mut self,
1054        key: HostStateKey,
1055        bytes: Vec<u8>,
1056        ctx: Option<StateTenantCtx>,
1057    ) -> Result<StateOpAck, StateError> {
1058        let store = match self.state_store.as_ref() {
1059            Some(store) => store.clone(),
1060            None => {
1061                return Err(StateError {
1062                    code: "unavailable".into(),
1063                    message: "state store not configured".into(),
1064                });
1065            }
1066        };
1067        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
1068            Ok(ctx) => ctx,
1069            Err(err) => {
1070                return Err(StateError {
1071                    code: "invalid-ctx".into(),
1072                    message: err.to_string(),
1073                });
1074            }
1075        };
1076        #[cfg(feature = "fault-injection")]
1077        {
1078            let exec_ctx = self.exec_ctx.as_ref();
1079            let flow_id = exec_ctx
1080                .map(|ctx| ctx.flow_id.as_str())
1081                .unwrap_or("unknown");
1082            let node_id = exec_ctx.and_then(|ctx| ctx.node_id.as_deref());
1083            let attempt = exec_ctx.map(|ctx| ctx.tenant.attempt).unwrap_or(1);
1084            let fault_ctx = FaultContext {
1085                pack_id: self.pack_id.as_str(),
1086                flow_id,
1087                node_id,
1088                attempt,
1089            };
1090            if let Err(err) = maybe_fail(FaultPoint::StateWrite, fault_ctx) {
1091                return Err(StateError {
1092                    code: "internal".into(),
1093                    message: err.to_string(),
1094                });
1095            }
1096        }
1097        let key = StoreStateKey::from(key);
1098        let value = serde_json::from_slice(&bytes)
1099            .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
1100        match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
1101            Ok(()) => Ok(StateOpAck::Ok),
1102            Err(err) => Err(StateError {
1103                code: "internal".into(),
1104                message: err.to_string(),
1105            }),
1106        }
1107    }
1108
1109    fn delete(
1110        &mut self,
1111        key: HostStateKey,
1112        ctx: Option<StateTenantCtx>,
1113    ) -> Result<StateOpAck, StateError> {
1114        let store = match self.state_store.as_ref() {
1115            Some(store) => store.clone(),
1116            None => {
1117                return Err(StateError {
1118                    code: "unavailable".into(),
1119                    message: "state store not configured".into(),
1120                });
1121            }
1122        };
1123        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
1124            Ok(ctx) => ctx,
1125            Err(err) => {
1126                return Err(StateError {
1127                    code: "invalid-ctx".into(),
1128                    message: err.to_string(),
1129                });
1130            }
1131        };
1132        let key = StoreStateKey::from(key);
1133        match store.del(&tenant_ctx, STATE_PREFIX, &key) {
1134            Ok(_) => Ok(StateOpAck::Ok),
1135            Err(err) => Err(StateError {
1136                code: "internal".into(),
1137                message: err.to_string(),
1138            }),
1139        }
1140    }
1141}
1142
1143impl TelemetryLoggerHost for HostState {
1144    fn log(
1145        &mut self,
1146        span: TelemetrySpanContext,
1147        fields: Vec<(String, String)>,
1148        _ctx: Option<TelemetryTenantCtx>,
1149    ) -> Result<TelemetryAck, TelemetryError> {
1150        if let Some(mock) = &self.mocks
1151            && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
1152        {
1153            return Ok(TelemetryAck::Ok);
1154        }
1155        let mut map = serde_json::Map::new();
1156        for (k, v) in fields {
1157            map.insert(k, Value::String(v));
1158        }
1159        tracing::info!(
1160            tenant = %span.tenant,
1161            flow_id = %span.flow_id,
1162            node = ?span.node_id,
1163            provider = %span.provider,
1164            fields = %serde_json::Value::Object(map.clone()),
1165            "telemetry log from pack"
1166        );
1167        Ok(TelemetryAck::Ok)
1168    }
1169}
1170
1171impl RunnerHostHttp for HostState {
1172    fn request(
1173        &mut self,
1174        method: String,
1175        url: String,
1176        headers: Vec<String>,
1177        body: Option<Vec<u8>>,
1178    ) -> Result<Vec<u8>, String> {
1179        let req = HttpRequest {
1180            method,
1181            url,
1182            headers: headers
1183                .chunks(2)
1184                .filter_map(|chunk| {
1185                    if chunk.len() == 2 {
1186                        Some((chunk[0].clone(), chunk[1].clone()))
1187                    } else {
1188                        None
1189                    }
1190                })
1191                .collect(),
1192            body,
1193        };
1194        match HttpClientHost::send(self, req, None) {
1195            Ok(resp) => Ok(resp.body.unwrap_or_default()),
1196            Err(err) => Err(err.message),
1197        }
1198    }
1199}
1200
1201impl RunnerHostKv for HostState {
1202    fn get(&mut self, _ns: String, _key: String) -> Option<String> {
1203        None
1204    }
1205
1206    fn put(&mut self, _ns: String, _key: String, _val: String) {}
1207}
1208
1209enum ManifestLoad {
1210    New {
1211        manifest: Box<greentic_types::PackManifest>,
1212        flows: PackFlows,
1213    },
1214    Legacy {
1215        manifest: Box<legacy_pack::PackManifest>,
1216        flows: PackFlows,
1217    },
1218}
1219
1220fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
1221    let mut archive = ZipArchive::new(File::open(path)?)
1222        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1223    let bytes = read_entry(&mut archive, "manifest.cbor")
1224        .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
1225    match decode_pack_manifest(&bytes) {
1226        Ok(manifest) => {
1227            let cache = PackFlows::from_manifest(manifest.clone());
1228            Ok(ManifestLoad::New {
1229                manifest: Box::new(manifest),
1230                flows: cache,
1231            })
1232        }
1233        Err(err) => {
1234            tracing::debug!(
1235                error = %err,
1236                pack = %path.display(),
1237                "decode_pack_manifest failed for archive; falling back to legacy manifest"
1238            );
1239            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
1240                .context("failed to decode legacy pack manifest from manifest.cbor")?;
1241            let flows = load_legacy_flows_from_archive(&mut archive, &legacy)?;
1242            Ok(ManifestLoad::Legacy {
1243                manifest: Box::new(legacy),
1244                flows,
1245            })
1246        }
1247    }
1248}
1249
1250fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
1251    let manifest_path = root.join("manifest.cbor");
1252    let bytes = std::fs::read(&manifest_path)
1253        .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
1254    match decode_pack_manifest(&bytes) {
1255        Ok(manifest) => {
1256            let cache = PackFlows::from_manifest(manifest.clone());
1257            Ok(ManifestLoad::New {
1258                manifest: Box::new(manifest),
1259                flows: cache,
1260            })
1261        }
1262        Err(err) => {
1263            tracing::debug!(
1264                error = %err,
1265                pack = %root.display(),
1266                "decode_pack_manifest failed for materialized pack; trying legacy manifest"
1267            );
1268            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
1269                .context("failed to decode legacy pack manifest from manifest.cbor")?;
1270            let flows = load_legacy_flows_from_dir(root, &legacy)?;
1271            Ok(ManifestLoad::Legacy {
1272                manifest: Box::new(legacy),
1273                flows,
1274            })
1275        }
1276    }
1277}
1278
1279fn load_legacy_flows_from_dir(
1280    root: &Path,
1281    manifest: &legacy_pack::PackManifest,
1282) -> Result<PackFlows> {
1283    build_legacy_flows(manifest, |rel_path| {
1284        let path = root.join(rel_path);
1285        std::fs::read(&path).with_context(|| format!("missing flow json {}", path.display()))
1286    })
1287}
1288
1289fn load_legacy_flows_from_archive(
1290    archive: &mut ZipArchive<File>,
1291    manifest: &legacy_pack::PackManifest,
1292) -> Result<PackFlows> {
1293    build_legacy_flows(manifest, |rel_path| {
1294        read_entry(archive, rel_path).with_context(|| format!("missing flow json {}", rel_path))
1295    })
1296}
1297
1298fn build_legacy_flows(
1299    manifest: &legacy_pack::PackManifest,
1300    mut read_json: impl FnMut(&str) -> Result<Vec<u8>>,
1301) -> Result<PackFlows> {
1302    let mut flows = HashMap::new();
1303    let mut descriptors = Vec::new();
1304
1305    for entry in &manifest.flows {
1306        let bytes = read_json(&entry.file_json)
1307            .with_context(|| format!("missing flow json {}", entry.file_json))?;
1308        let doc = parse_flow_doc_with_legacy_aliases(&bytes)?;
1309        let normalized = normalize_flow_doc(doc);
1310        let flow_ir = flow_doc_to_ir(normalized)?;
1311        let flow = flow_ir_to_flow(flow_ir)?;
1312
1313        descriptors.push(FlowDescriptor {
1314            id: entry.id.clone(),
1315            flow_type: entry.kind.clone(),
1316            pack_id: manifest.meta.pack_id.clone(),
1317            profile: manifest.meta.pack_id.clone(),
1318            version: manifest.meta.version.to_string(),
1319            description: None,
1320        });
1321        flows.insert(entry.id.clone(), flow);
1322    }
1323
1324    let mut entry_flows = manifest.meta.entry_flows.clone();
1325    if entry_flows.is_empty() {
1326        entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
1327    }
1328    let metadata = PackMetadata {
1329        pack_id: manifest.meta.pack_id.clone(),
1330        version: manifest.meta.version.to_string(),
1331        entry_flows,
1332        secret_requirements: Vec::new(),
1333    };
1334
1335    Ok(PackFlows {
1336        descriptors,
1337        flows,
1338        metadata,
1339    })
1340}
1341
1342fn parse_flow_doc_with_legacy_aliases(bytes: &[u8]) -> Result<FlowDoc> {
1343    let mut value: Value =
1344        serde_json::from_slice(bytes).context("failed to decode flow doc JSON")?;
1345    if let Some(map) = value.as_object_mut()
1346        && !map.contains_key("type")
1347        && let Some(flow_type) = map.remove("flow_type")
1348    {
1349        map.insert("type".to_string(), flow_type);
1350    }
1351    serde_json::from_value(value).context("failed to decode flow doc structure")
1352}
1353
1354pub struct ComponentState {
1355    pub host: HostState,
1356    wasi_ctx: WasiCtx,
1357    wasi_tls_ctx: WasiTlsCtx,
1358    wasi_http_ctx: WasiHttpCtx,
1359    resource_table: ResourceTable,
1360}
1361
1362/// Install the process-default rustls [`rustls::crypto::CryptoProvider`] exactly once.
1363///
1364/// `wasmtime-wasi-tls` 45's `RustlsProvider::default()` builds its
1365/// `rustls::ClientConfig` via `ClientConfig::builder()`, which resolves the
1366/// process-default provider. Our dependency graph enables BOTH the `ring` and
1367/// `aws_lc_rs` rustls backends, so there is no unambiguous implicit default and
1368/// the builder panics ("no process-level CryptoProvider available") the first
1369/// time [`WasiTlsCtxBuilder::build`] constructs the default provider. Install
1370/// the workspace-selected aws-lc-rs provider before that ever happens.
1371/// Idempotent: a returned `Err` means a default was already installed.
1372fn install_default_crypto_provider() {
1373    static ONCE: std::sync::Once = std::sync::Once::new();
1374    ONCE.call_once(|| {
1375        let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
1376    });
1377}
1378
1379impl ComponentState {
1380    pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
1381        // Must run before `WasiTlsCtxBuilder::build()` below, which eagerly
1382        // constructs wasi-tls's default rustls provider.
1383        install_default_crypto_provider();
1384        let wasi_ctx = policy
1385            .instantiate()
1386            .context("failed to build WASI context")?;
1387        Ok(Self {
1388            host,
1389            wasi_ctx,
1390            wasi_tls_ctx: WasiTlsCtxBuilder::new().build(),
1391            wasi_http_ctx: WasiHttpCtx::new(),
1392            resource_table: ResourceTable::new(),
1393        })
1394    }
1395
1396    fn host_mut(&mut self) -> &mut HostState {
1397        &mut self.host
1398    }
1399
1400    fn should_cancel_host(&mut self) -> bool {
1401        false
1402    }
1403
1404    fn yield_now_host(&mut self) {
1405        // no-op cooperative yield
1406    }
1407}
1408
1409impl component_api::v0_4::greentic::component::control::Host for ComponentState {
1410    fn should_cancel(&mut self) -> bool {
1411        self.should_cancel_host()
1412    }
1413
1414    fn yield_now(&mut self) {
1415        self.yield_now_host();
1416    }
1417}
1418
1419impl component_api::v0_5::greentic::component::control::Host for ComponentState {
1420    fn should_cancel(&mut self) -> bool {
1421        self.should_cancel_host()
1422    }
1423
1424    fn yield_now(&mut self) {
1425        self.yield_now_host();
1426    }
1427}
1428
1429fn add_component_control_instance(
1430    linker: &mut Linker<ComponentState>,
1431    name: &str,
1432) -> wasmtime::Result<()> {
1433    let mut inst = linker.instance(name)?;
1434    inst.func_wrap(
1435        "should-cancel",
1436        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
1437            let host = caller.data_mut();
1438            Ok((host.should_cancel_host(),))
1439        },
1440    )?;
1441    inst.func_wrap(
1442        "yield-now",
1443        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
1444            let host = caller.data_mut();
1445            host.yield_now_host();
1446            Ok(())
1447        },
1448    )?;
1449    Ok(())
1450}
1451
1452fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
1453    add_component_control_instance(linker, "greentic:component/control@0.5.0")?;
1454    add_component_control_instance(linker, "greentic:component/control@0.4.0")?;
1455    Ok(())
1456}
1457
1458/// Reduced-authority linker for `identify-instance` and
1459/// `describe-identify-instance` probes (M1 IID Phase D).
1460///
1461/// Identity probes match an inbound webhook payload against known
1462/// per-endpoint discriminators (Telegram secret-token header, Slack
1463/// signing-secret, Teams JWT issuer, …) and return an endpoint id
1464/// or `none`. The WIT contract is a pure projection over `(headers,
1465/// body)` — no outbound HTTP, no persistent state, no secrets.
1466///
1467/// # Why this delegates to `register_all` (Wasmtime eager-import constraint)
1468///
1469/// Wasmtime's [`Linker::instantiate_pre`] type-checks the component's
1470/// **entire** import graph eagerly — not just the imports reachable from
1471/// the export the caller intends to invoke. A provider component that
1472/// exports `instance-identity-api` alongside `schema-core-api` typically
1473/// also imports `http-client`, `secrets-store`, etc. for the latter.
1474/// If the linker omits those imports, `instantiate_pre` fails with
1475/// `"a matching implementation was not found in the linker"` before the
1476/// identity export is even checked.
1477///
1478/// The ideal probe linker would register deny-shim handlers that satisfy
1479/// the import graph but trap on actual invocation. That requires
1480/// deny-shim support in `greentic-interfaces-wasmtime` (tracked as a
1481/// follow-up). Until then, probes use the same linker surface as normal
1482/// execution, with state-store disabled.
1483///
1484/// The reduced-authority boundary is enforced at the WASI policy layer
1485/// instead: probe call sites construct a locked-down
1486/// [`RunnerWasiPolicy`](crate::wasi::RunnerWasiPolicy) with no
1487/// preopens, no env passthrough, and no stdio inheritance. See
1488/// [`RunnerWasiPolicy::probe()`](crate::wasi::RunnerWasiPolicy::probe)
1489/// and the `probe_wasi_policy_is_locked_down` test.
1490pub fn register_identity_probe(linker: &mut Linker<ComponentState>) -> Result<()> {
1491    // Delegates to `register_all` with state-store disabled. See doc
1492    // comment above for the rationale (Wasmtime eager-import validation).
1493    register_all(linker, false)
1494}
1495
1496#[cfg(test)]
1497mod register_identity_probe_tests {
1498    use super::*;
1499
1500    /// Verify that [`register_identity_probe`] successfully links all
1501    /// imports needed by real provider components (wasi-core, wasi-tls,
1502    /// wasi-http, http-client, secrets-store, telemetry, etc.).
1503    ///
1504    /// Before this fix, the probe linker omitted most host imports.
1505    /// Wasmtime's `instantiate_pre` validates the **entire** import
1506    /// graph eagerly, so any provider with runtime imports (all real
1507    /// providers) would fail before the identity export was checked.
1508    #[test]
1509    fn register_identity_probe_links_successfully() {
1510        let engine = wasmtime::Engine::default();
1511        let mut linker = Linker::<ComponentState>::new(&engine);
1512        register_identity_probe(&mut linker).expect("probe linker registers all imports");
1513    }
1514
1515    /// Verify that the probe WASI policy has no preopens, no env, and
1516    /// no stdio — the only reduced-authority boundary available today.
1517    #[test]
1518    fn probe_wasi_policy_is_locked_down() {
1519        let policy = RunnerWasiPolicy::probe();
1520        assert!(!policy.inherit_stdio, "probe WASI must not inherit stdio");
1521        assert!(
1522            policy.preopens.is_empty(),
1523            "probe WASI must have no preopens"
1524        );
1525        assert!(
1526            policy.env_allow.is_empty(),
1527            "probe WASI must not allow env vars"
1528        );
1529        assert!(
1530            policy.env_set.is_empty(),
1531            "probe WASI must not set env vars"
1532        );
1533    }
1534}
1535
1536pub fn register_all(linker: &mut Linker<ComponentState>, allow_state_store: bool) -> Result<()> {
1537    add_wasi_to_linker(linker)?;
1538
1539    // Add wasi-tls types and turn on the feature in linker
1540    let mut opts = LinkOptions::default();
1541    opts.tls(true);
1542    wasmtime_wasi_tls::p2::add_to_linker(linker, &opts)?;
1543
1544    // Add wasi-http types and turn on the feature in linker
1545    add_wasi_http_to_linker(linker)?;
1546
1547    add_all_v1_to_linker(
1548        linker,
1549        HostFns {
1550            http_client_v1_1: Some(|state: &mut ComponentState| state.host_mut()),
1551            http_client: Some(|state: &mut ComponentState| state.host_mut()),
1552            oauth_broker: None,
1553            runner_host_http: Some(|state: &mut ComponentState| state.host_mut()),
1554            runner_host_kv: Some(|state: &mut ComponentState| state.host_mut()),
1555            telemetry_logger: Some(|state: &mut ComponentState| state.host_mut()),
1556            state_store: allow_state_store.then_some(|state: &mut ComponentState| state.host_mut()),
1557            secrets_store_v1_1: Some(|state: &mut ComponentState| state.host_mut()),
1558            secrets_store: None,
1559            runtime_config: Some(|state: &mut ComponentState| state.host_mut()),
1560        },
1561    )?;
1562    add_http_client_client_world_aliases(linker)?;
1563    Ok(())
1564}
1565
1566fn add_http_client_client_world_aliases(linker: &mut Linker<ComponentState>) -> Result<()> {
1567    let mut inst_v1_1 = linker.instance("greentic:http/client@1.1.0")?;
1568    inst_v1_1.func_wrap(
1569        "send",
1570        move |mut caller: StoreContextMut<'_, ComponentState>,
1571              (req, opts, ctx): (
1572            http_client_client_alias::Request,
1573            Option<http_client_client_alias::RequestOptions>,
1574            Option<http_client_client_alias::TenantCtx>,
1575        )| {
1576            let host = caller.data_mut().host_mut();
1577            let result = HttpClientHostV1_1::send(
1578                host,
1579                alias_request_to_host(req),
1580                opts.map(alias_request_options_to_host),
1581                ctx.map(alias_tenant_ctx_to_host),
1582            );
1583            Ok((match result {
1584                Ok(resp) => Ok(alias_response_from_host(resp)),
1585                Err(err) => Err(alias_error_from_host(err)),
1586            },))
1587        },
1588    )?;
1589    let mut inst_v1_0 = linker.instance("greentic:http/client@1.0.0")?;
1590    inst_v1_0.func_wrap(
1591        "send",
1592        move |mut caller: StoreContextMut<'_, ComponentState>,
1593              (req, ctx): (
1594            host_http_client::Request,
1595            Option<host_http_client::TenantCtx>,
1596        )| {
1597            let host = caller.data_mut().host_mut();
1598            let result = HttpClientHost::send(host, req, ctx);
1599            Ok((result,))
1600        },
1601    )?;
1602    Ok(())
1603}
1604
1605fn alias_request_to_host(req: http_client_client_alias::Request) -> host_http_client::RequestV1_1 {
1606    host_http_client::RequestV1_1 {
1607        method: req.method,
1608        url: req.url,
1609        headers: req.headers,
1610        body: req.body,
1611    }
1612}
1613
1614fn alias_request_options_to_host(
1615    opts: http_client_client_alias::RequestOptions,
1616) -> host_http_client::RequestOptionsV1_1 {
1617    host_http_client::RequestOptionsV1_1 {
1618        timeout_ms: opts.timeout_ms,
1619        allow_insecure: opts.allow_insecure,
1620        follow_redirects: opts.follow_redirects,
1621    }
1622}
1623
1624fn alias_tenant_ctx_to_host(
1625    ctx: http_client_client_alias::TenantCtx,
1626) -> host_http_client::TenantCtxV1_1 {
1627    host_http_client::TenantCtxV1_1 {
1628        env: ctx.env,
1629        tenant: ctx.tenant,
1630        tenant_id: ctx.tenant_id,
1631        team: ctx.team,
1632        team_id: ctx.team_id,
1633        user: ctx.user,
1634        user_id: ctx.user_id,
1635        trace_id: ctx.trace_id,
1636        correlation_id: ctx.correlation_id,
1637        i18n_id: ctx.i18n_id,
1638        attributes: ctx.attributes,
1639        session_id: ctx.session_id,
1640        flow_id: ctx.flow_id,
1641        node_id: ctx.node_id,
1642        provider_id: ctx.provider_id,
1643        deadline_ms: ctx.deadline_ms,
1644        attempt: ctx.attempt,
1645        idempotency_key: ctx.idempotency_key,
1646        impersonation: ctx.impersonation.map(|imp| http_types_v1_1::Impersonation {
1647            actor_id: imp.actor_id,
1648            reason: imp.reason,
1649        }),
1650    }
1651}
1652
1653fn alias_response_from_host(
1654    resp: host_http_client::ResponseV1_1,
1655) -> http_client_client_alias::Response {
1656    http_client_client_alias::Response {
1657        status: resp.status,
1658        headers: resp.headers,
1659        body: resp.body,
1660    }
1661}
1662
1663fn alias_error_from_host(
1664    err: host_http_client::HttpClientErrorV1_1,
1665) -> http_client_client_alias::HostError {
1666    http_client_client_alias::HostError {
1667        code: err.code,
1668        message: err.message,
1669    }
1670}
1671
1672impl OAuthHostContext for ComponentState {
1673    fn tenant_id(&self) -> &str {
1674        &self.host.config.tenant
1675    }
1676
1677    fn env(&self) -> &str {
1678        &self.host.default_env
1679    }
1680
1681    fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
1682        &mut self.host.oauth_host
1683    }
1684
1685    fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
1686        self.host.oauth_config.as_ref()
1687    }
1688}
1689
1690impl WasiView for ComponentState {
1691    fn ctx(&mut self) -> WasiCtxView<'_> {
1692        WasiCtxView {
1693            ctx: &mut self.wasi_ctx,
1694            table: &mut self.resource_table,
1695        }
1696    }
1697}
1698
1699impl WasiHttpView for ComponentState {
1700    fn http(&mut self) -> WasiHttpCtxView<'_> {
1701        WasiHttpCtxView {
1702            ctx: &mut self.wasi_http_ctx,
1703            table: &mut self.resource_table,
1704            hooks: Default::default(),
1705        }
1706    }
1707}
1708
1709impl WasiTlsView for ComponentState {
1710    fn tls(&mut self) -> WasiTlsCtxView<'_> {
1711        WasiTlsCtxView {
1712            ctx: &mut self.wasi_tls_ctx,
1713            table: &mut self.resource_table,
1714        }
1715    }
1716}
1717
1718#[allow(unsafe_code)]
1719unsafe impl Send for ComponentState {}
1720#[allow(unsafe_code)]
1721unsafe impl Sync for ComponentState {}
1722
1723impl PackRuntime {
1724    fn allows_state_store(&self, component_ref: &str) -> bool {
1725        if self.state_store.is_none() {
1726            return false;
1727        }
1728        if !self.config.state_store_policy.allow {
1729            return false;
1730        }
1731        let Some(manifest) = self.component_manifests.get(component_ref) else {
1732            // No manifest entry — allow state-store; Wasmtime rejects if not imported.
1733            return true;
1734        };
1735        // If manifest declares host.state capabilities, honour them.
1736        // If host.state is None (not declared in manifest), default to true so
1737        // components whose CBOR manifest omits the field still get state-store
1738        // linked — Wasmtime will reject at instantiation if not actually imported.
1739        manifest
1740            .capabilities
1741            .host
1742            .state
1743            .as_ref()
1744            .map(|caps| caps.read || caps.write)
1745            .unwrap_or(true)
1746    }
1747
1748    pub fn contains_component(&self, component_ref: &str) -> bool {
1749        self.components.contains_key(component_ref)
1750    }
1751
1752    /// Returns a clonable handle to the pack's state store, when one is
1753    /// configured. Used by the flow engine's built-in `state.get`/`state.set`
1754    /// operators which call into the same store that WASM components read
1755    /// through their `state.read`/`state.write` host imports.
1756    pub fn state_store_handle(&self) -> Option<crate::storage::DynStateStore> {
1757        self.state_store.clone()
1758    }
1759
1760    #[allow(clippy::too_many_arguments)]
1761    pub async fn load(
1762        path: impl AsRef<Path>,
1763        config: Arc<HostConfig>,
1764        mocks: Option<Arc<MockLayer>>,
1765        archive_source: Option<&Path>,
1766        session_store: Option<DynSessionStore>,
1767        state_store: Option<DynStateStore>,
1768        wasi_policy: Arc<RunnerWasiPolicy>,
1769        secrets: DynSecretsManager,
1770        oauth_config: Option<OAuthBrokerConfig>,
1771        verify_archive: bool,
1772        component_resolution: ComponentResolution,
1773    ) -> Result<Self> {
1774        let path = path.as_ref();
1775        let (_pack_root, safe_path) = normalize_pack_path(path)?;
1776        let path_meta = std::fs::metadata(&safe_path).ok();
1777        let is_dir = path_meta
1778            .as_ref()
1779            .map(|meta| meta.is_dir())
1780            .unwrap_or(false);
1781        let is_component = !is_dir
1782            && safe_path
1783                .extension()
1784                .and_then(|ext| ext.to_str())
1785                .map(|ext| ext.eq_ignore_ascii_case("wasm"))
1786                .unwrap_or(false);
1787        let archive_hint_path = if let Some(source) = archive_source {
1788            let (_, normalized) = normalize_pack_path(source)?;
1789            Some(normalized)
1790        } else if is_component || is_dir {
1791            None
1792        } else {
1793            Some(safe_path.clone())
1794        };
1795        let archive_hint = archive_hint_path.as_deref();
1796        if verify_archive {
1797            if let Some(verify_target) = archive_hint.and_then(|p| {
1798                std::fs::metadata(p)
1799                    .ok()
1800                    .filter(|meta| meta.is_file())
1801                    .map(|_| p)
1802            }) {
1803                verify::verify_pack(verify_target).await?;
1804                tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
1805            } else {
1806                tracing::debug!("skipping archive verification (no archive source)");
1807            }
1808        }
1809        let engine = Engine::default();
1810        let engine_profile =
1811            EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1812        let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1813        let mut metadata = PackMetadata::fallback(&safe_path);
1814        let mut manifest = None;
1815        let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
1816        let mut flows = None;
1817        let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
1818            if is_dir {
1819                Some(safe_path.clone())
1820            } else {
1821                None
1822            }
1823        });
1824        let (pack_assets_dir, assets_tempdir) =
1825            locate_pack_assets(materialized_root.as_deref(), archive_hint)?;
1826        let setup_yaml_exists = pack_assets_dir
1827            .as_ref()
1828            .map(|dir| dir.join("setup.yaml").is_file())
1829            .unwrap_or(false);
1830        tracing::info!(
1831            pack_root = %safe_path.display(),
1832            assets_setup_yaml_exists = setup_yaml_exists,
1833            "pack unpack metadata"
1834        );
1835
1836        if let Some(root) = materialized_root.as_ref() {
1837            match load_manifest_and_flows_from_dir(root) {
1838                Ok(ManifestLoad::New {
1839                    manifest: m,
1840                    flows: cache,
1841                }) => {
1842                    metadata = cache.metadata.clone();
1843                    manifest = Some(*m);
1844                    flows = Some(cache);
1845                }
1846                Ok(ManifestLoad::Legacy {
1847                    manifest: m,
1848                    flows: cache,
1849                }) => {
1850                    metadata = cache.metadata.clone();
1851                    legacy_manifest = Some(m);
1852                    flows = Some(cache);
1853                }
1854                Err(err) => {
1855                    warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
1856                }
1857            }
1858        }
1859
1860        if manifest.is_none()
1861            && legacy_manifest.is_none()
1862            && let Some(archive_path) = archive_hint
1863        {
1864            let manifest_load = load_manifest_and_flows(archive_path).with_context(|| {
1865                format!(
1866                    "failed to load manifest.cbor from {}",
1867                    archive_path.display()
1868                )
1869            })?;
1870            match manifest_load {
1871                ManifestLoad::New {
1872                    manifest: m,
1873                    flows: cache,
1874                } => {
1875                    metadata = cache.metadata.clone();
1876                    manifest = Some(*m);
1877                    flows = Some(cache);
1878                }
1879                ManifestLoad::Legacy {
1880                    manifest: m,
1881                    flows: cache,
1882                } => {
1883                    metadata = cache.metadata.clone();
1884                    legacy_manifest = Some(m);
1885                    flows = Some(cache);
1886                }
1887            }
1888        }
1889        #[cfg(feature = "fault-injection")]
1890        {
1891            let fault_ctx = FaultContext {
1892                pack_id: metadata.pack_id.as_str(),
1893                flow_id: "unknown",
1894                node_id: None,
1895                attempt: 1,
1896            };
1897            maybe_fail(FaultPoint::PackResolve, fault_ctx)
1898                .map_err(|err| anyhow!(err.to_string()))?;
1899        }
1900        let mut pack_lock = None;
1901        for root in find_pack_lock_roots(&safe_path, is_dir, archive_hint) {
1902            pack_lock = load_pack_lock(&root)?;
1903            if pack_lock.is_some() {
1904                break;
1905            }
1906        }
1907        let component_sources_payload = if pack_lock.is_none() {
1908            if let Some(manifest) = manifest.as_ref() {
1909                manifest
1910                    .get_component_sources_v1()
1911                    .context("invalid component sources extension")?
1912            } else {
1913                None
1914            }
1915        } else {
1916            None
1917        };
1918        let component_sources = if let Some(lock) = pack_lock.as_ref() {
1919            Some(component_sources_table_from_pack_lock(
1920                lock,
1921                component_resolution.allow_missing_hash,
1922            )?)
1923        } else {
1924            component_sources_table(component_sources_payload.as_ref())?
1925        };
1926        let components = if is_component {
1927            let wasm_bytes = fs::read(&safe_path).await?;
1928            metadata = PackMetadata::from_wasm(&wasm_bytes)
1929                .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1930            let name = safe_path
1931                .file_stem()
1932                .map(|s| s.to_string_lossy().to_string())
1933                .unwrap_or_else(|| "component".to_string());
1934            let component = compile_component_with_cache(&cache, &engine, None, wasm_bytes).await?;
1935            let mut map = HashMap::new();
1936            map.insert(
1937                name.clone(),
1938                PackComponent {
1939                    name,
1940                    version: metadata.version.clone(),
1941                    component,
1942                },
1943            );
1944            map
1945        } else {
1946            let specs = component_specs(
1947                manifest.as_ref(),
1948                legacy_manifest.as_deref(),
1949                component_sources_payload.as_ref(),
1950                pack_lock.as_ref(),
1951            );
1952            if specs.is_empty() {
1953                HashMap::new()
1954            } else {
1955                let mut loaded = HashMap::new();
1956                let mut missing: HashSet<String> =
1957                    specs.iter().map(|spec| spec.id.clone()).collect();
1958                let mut searched = Vec::new();
1959
1960                if !component_resolution.overrides.is_empty() {
1961                    load_components_from_overrides(
1962                        &cache,
1963                        &engine,
1964                        &component_resolution.overrides,
1965                        &specs,
1966                        &mut missing,
1967                        &mut loaded,
1968                    )
1969                    .await?;
1970                    searched.push("override map".to_string());
1971                }
1972
1973                if let Some(component_sources) = component_sources.as_ref() {
1974                    load_components_from_sources(
1975                        &cache,
1976                        &engine,
1977                        component_sources,
1978                        &component_resolution,
1979                        &specs,
1980                        &mut missing,
1981                        &mut loaded,
1982                        materialized_root.as_deref(),
1983                        archive_hint,
1984                    )
1985                    .await?;
1986                    searched.push(format!("extension {}", EXT_COMPONENT_SOURCES_V1));
1987                }
1988
1989                if let Some(root) = materialized_root.as_ref() {
1990                    load_components_from_dir(
1991                        &cache,
1992                        &engine,
1993                        root,
1994                        &specs,
1995                        &mut missing,
1996                        &mut loaded,
1997                    )
1998                    .await?;
1999                    searched.push(format!("components dir {}", root.display()));
2000                }
2001
2002                if let Some(archive_path) = archive_hint {
2003                    load_components_from_archive(
2004                        &cache,
2005                        &engine,
2006                        archive_path,
2007                        &specs,
2008                        &mut missing,
2009                        &mut loaded,
2010                    )
2011                    .await?;
2012                    searched.push(format!("archive {}", archive_path.display()));
2013                }
2014
2015                if !missing.is_empty() {
2016                    let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
2017                    let sources = if searched.is_empty() {
2018                        "no component sources".to_string()
2019                    } else {
2020                        searched.join(", ")
2021                    };
2022                    bail!(
2023                        "components missing: {}; looked in {}",
2024                        missing_list,
2025                        sources
2026                    );
2027                }
2028
2029                loaded
2030            }
2031        };
2032        let http_client = Arc::clone(&HTTP_CLIENT);
2033        let mut component_manifests = HashMap::new();
2034        if let Some(manifest) = manifest.as_ref() {
2035            for component in &manifest.components {
2036                component_manifests.insert(component.id.as_str().to_string(), component.clone());
2037            }
2038        }
2039        let mut pack_policy = (*wasi_policy).clone();
2040        if let Some(dir) = pack_assets_dir {
2041            tracing::debug!(path = %dir.display(), "preopening pack assets directory for WASI /assets");
2042            pack_policy =
2043                pack_policy.with_preopen(PreopenSpec::new(dir, "/assets").read_only(true));
2044        }
2045        let wasi_policy = Arc::new(pack_policy);
2046        Ok(Self {
2047            path: safe_path,
2048            archive_path: archive_hint.map(Path::to_path_buf),
2049            config,
2050            engine,
2051            metadata,
2052            manifest,
2053            legacy_manifest,
2054            component_manifests,
2055            mocks,
2056            flows,
2057            components,
2058            http_client,
2059            pre_cache: Mutex::new(HashMap::new()),
2060            session_store,
2061            state_store,
2062            wasi_policy,
2063            assets_tempdir,
2064            provider_registry: RwLock::new(None),
2065            identify_hint_cache: RwLock::new(HashMap::new()),
2066            secrets,
2067            oauth_config,
2068            cache,
2069            runtime_config_non_secret: None,
2070            runtime_refs: None,
2071        })
2072    }
2073
2074    /// Inject the `pack-config.v1.non_secret` map for this pack. Called by
2075    /// the producer (greentic-start, C4.3) after loading the deployed
2076    /// `PackConfig`. Passing `None` clears any previously-set map.
2077    pub fn set_runtime_config_non_secret(&mut self, map: Option<Arc<BTreeMap<String, Value>>>) {
2078        self.runtime_config_non_secret = map;
2079    }
2080
2081    /// Read-only accessor for the injected `pack-config.v1.non_secret` map.
2082    /// Used by the revision loader's tests to assert producer plumbing.
2083    pub fn runtime_config_non_secret(&self) -> Option<&Arc<BTreeMap<String, Value>>> {
2084        self.runtime_config_non_secret.as_ref()
2085    }
2086
2087    /// Inject the `pack-config.v1.runtime_refs` channel (C5): per-pack
2088    /// `key → URI` bindings plus the env-shared resolver. Called by
2089    /// greentic-start after loading the deployed `PackConfig`. Passing
2090    /// `None` clears any previously-set injection.
2091    pub fn set_runtime_refs(&mut self, injection: Option<RuntimeRefsInjection>) {
2092        self.runtime_refs = injection;
2093    }
2094
2095    /// Read-only accessor for the injected runtime-refs channel. Used by
2096    /// the revision loader's tests to assert producer plumbing.
2097    pub fn runtime_refs(&self) -> Option<&RuntimeRefsInjection> {
2098        self.runtime_refs.as_ref()
2099    }
2100
2101    pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
2102        if let Some(cache) = &self.flows {
2103            return Ok(cache.descriptors.clone());
2104        }
2105        if let Some(manifest) = &self.manifest {
2106            let descriptors = manifest
2107                .flows
2108                .iter()
2109                .map(|flow| FlowDescriptor {
2110                    id: flow.id.as_str().to_string(),
2111                    flow_type: flow_kind_to_str(flow.kind).to_string(),
2112                    pack_id: manifest.pack_id.as_str().to_string(),
2113                    profile: manifest.pack_id.as_str().to_string(),
2114                    version: manifest.version.to_string(),
2115                    description: None,
2116                })
2117                .collect();
2118            return Ok(descriptors);
2119        }
2120        Ok(Vec::new())
2121    }
2122
2123    #[allow(dead_code)]
2124    pub async fn run_flow(
2125        &self,
2126        flow_id: &str,
2127        input: serde_json::Value,
2128    ) -> Result<serde_json::Value> {
2129        let pack = Arc::new(
2130            PackRuntime::load(
2131                &self.path,
2132                Arc::clone(&self.config),
2133                self.mocks.clone(),
2134                self.archive_path.as_deref(),
2135                self.session_store.clone(),
2136                self.state_store.clone(),
2137                Arc::clone(&self.wasi_policy),
2138                self.secrets.clone(),
2139                self.oauth_config.clone(),
2140                false,
2141                ComponentResolution::default(),
2142            )
2143            .await?,
2144        );
2145
2146        let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
2147        let retry_config = self.config.retry_config().into();
2148        let mocks = pack.mocks.as_deref();
2149        let tenant = self.config.tenant.as_str();
2150
2151        let ctx = FlowContext {
2152            tenant,
2153            pack_id: pack.metadata().pack_id.as_str(),
2154            flow_id,
2155            node_id: None,
2156            tool: None,
2157            action: None,
2158            session_id: None,
2159            provider_id: None,
2160            retry_config,
2161            attempt: 1,
2162            observer: None,
2163            mocks,
2164        };
2165
2166        let execution = engine.execute(ctx, input).await?;
2167        match execution.status {
2168            FlowStatus::Completed => Ok(execution.output),
2169            FlowStatus::Waiting(wait) => Ok(serde_json::json!({
2170                "status": "pending",
2171                "reason": wait.reason,
2172                "resume": wait.snapshot,
2173                "response": execution.output,
2174            })),
2175        }
2176    }
2177
2178    pub async fn invoke_component(
2179        &self,
2180        component_ref: &str,
2181        ctx: ComponentExecCtx,
2182        operation: &str,
2183        config_json: Option<String>,
2184        input_json: String,
2185    ) -> Result<Value> {
2186        let component_ref = resolve_component_key(component_ref, operation, |key| {
2187            self.components.contains_key(key)
2188        });
2189        let pack_component = self
2190            .components
2191            .get(component_ref)
2192            .with_context(|| format!("component '{component_ref}' not found in pack"))?;
2193        let engine = self.engine.clone();
2194        let config = Arc::clone(&self.config);
2195        let http_client = Arc::clone(&self.http_client);
2196        let mocks = self.mocks.clone();
2197        let session_store = self.session_store.clone();
2198        let state_store = self.state_store.clone();
2199        let secrets = Arc::clone(&self.secrets);
2200        let oauth_config = self.oauth_config.clone();
2201        let wasi_policy = Arc::clone(&self.wasi_policy);
2202        let pack_id = self.metadata().pack_id.clone();
2203        let allow_state_store = self.allows_state_store(component_ref);
2204        let component = pack_component.component.clone();
2205        let component_ref_owned = component_ref.to_string();
2206        let operation_owned = operation.to_string();
2207        let input_owned =
2208            Self::merge_component_config_into_input_json(config_json.as_deref(), &input_json)
2209                .context("merge component config into invocation payload")?;
2210        let ctx_owned = ctx;
2211        let runtime_config_non_secret = self.runtime_config_non_secret.clone();
2212        let runtime_refs = self.runtime_refs.clone();
2213
2214        run_on_wasi_thread("component.invoke", move || {
2215            let mut linker = Linker::new(&engine);
2216            register_all(&mut linker, allow_state_store)?;
2217            add_component_control_to_linker(&mut linker)?;
2218
2219            let host_state = HostState::new(
2220                pack_id.clone(),
2221                config,
2222                http_client,
2223                mocks,
2224                session_store,
2225                state_store,
2226                secrets,
2227                oauth_config,
2228                Some(ctx_owned.clone()),
2229                Some(component_ref_owned.clone()),
2230                false,
2231                runtime_config_non_secret,
2232                runtime_refs,
2233            )?;
2234            let store_state = ComponentState::new(host_state, wasi_policy)?;
2235            let mut store = wasmtime::Store::new(&engine, store_state);
2236
2237            let invoke_result = HostState::instantiate_component_result(
2238                &mut linker,
2239                &mut store,
2240                &component,
2241                &ctx_owned,
2242                &component_ref_owned,
2243                &operation_owned,
2244                &input_owned,
2245            )?;
2246            HostState::convert_invoke_result(invoke_result)
2247        })
2248    }
2249
2250    fn merge_component_config_into_input_json(
2251        config_json: Option<&str>,
2252        input_json: &str,
2253    ) -> Result<String> {
2254        let Some(config_json) = config_json else {
2255            return Ok(input_json.to_string());
2256        };
2257
2258        let config_value: Value =
2259            serde_json::from_str(config_json).context("parse component config JSON")?;
2260
2261        if let Ok(mut invocation) =
2262            serde_json::from_str::<greentic_types::InvocationEnvelope>(input_json)
2263        {
2264            let payload_value = serde_json::from_slice(&invocation.payload).unwrap_or_else(|_| {
2265                Value::String(String::from_utf8_lossy(&invocation.payload).into_owned())
2266            });
2267            invocation.payload = serde_json::to_vec(&serde_json::json!({
2268                "config": config_value,
2269                "input": payload_value,
2270            }))
2271            .context("serialize merged invocation payload")?;
2272            return serde_json::to_string(&invocation)
2273                .context("serialize merged invocation envelope");
2274        }
2275
2276        let input_value = serde_json::from_str(input_json)
2277            .unwrap_or_else(|_| Value::String(input_json.to_string()));
2278        serde_json::to_string(&serde_json::json!({
2279            "config": config_value,
2280            "input": input_value,
2281        }))
2282        .context("serialize merged component input")
2283    }
2284
2285    pub fn resolve_provider(
2286        &self,
2287        provider_id: Option<&str>,
2288        provider_type: Option<&str>,
2289    ) -> Result<ProviderBinding> {
2290        let registry = self.provider_registry()?;
2291        registry.resolve(provider_id, provider_type)
2292    }
2293
2294    pub async fn invoke_provider(
2295        &self,
2296        binding: &ProviderBinding,
2297        ctx: ComponentExecCtx,
2298        op: &str,
2299        input_json: Vec<u8>,
2300    ) -> Result<Value> {
2301        let component_ref_owned = binding.component_ref.clone();
2302        let pack_component = self.components.get(&component_ref_owned).with_context(|| {
2303            format!("provider component '{component_ref_owned}' not found in pack")
2304        })?;
2305        let component = pack_component.component.clone();
2306
2307        let engine = self.engine.clone();
2308        let config = Arc::clone(&self.config);
2309        let http_client = Arc::clone(&self.http_client);
2310        let mocks = self.mocks.clone();
2311        let session_store = self.session_store.clone();
2312        let state_store = self.state_store.clone();
2313        let secrets = Arc::clone(&self.secrets);
2314        let oauth_config = self.oauth_config.clone();
2315        let wasi_policy = Arc::clone(&self.wasi_policy);
2316        let pack_id = self.metadata().pack_id.clone();
2317        let allow_state_store = self.allows_state_store(&component_ref_owned);
2318        let input_owned = input_json;
2319        let op_owned = op.to_string();
2320        let ctx_owned = ctx;
2321        let world = binding.world.clone();
2322        let runtime_config_non_secret = self.runtime_config_non_secret.clone();
2323        let runtime_refs = self.runtime_refs.clone();
2324
2325        run_on_wasi_thread("provider.invoke", move || {
2326            let mut linker = Linker::new(&engine);
2327            register_all(&mut linker, allow_state_store)?;
2328            add_component_control_to_linker(&mut linker)?;
2329            let host_state = HostState::new(
2330                pack_id.clone(),
2331                config,
2332                http_client,
2333                mocks,
2334                session_store,
2335                state_store,
2336                secrets,
2337                oauth_config,
2338                Some(ctx_owned.clone()),
2339                Some(component_ref_owned.clone()),
2340                true,
2341                runtime_config_non_secret,
2342                runtime_refs,
2343            )?;
2344            let store_state = ComponentState::new(host_state, wasi_policy)?;
2345            let mut store = wasmtime::Store::new(&engine, store_state);
2346            let use_schema_core_schema = world.contains("provider-schema-core");
2347            let use_schema_core_path = world.contains("provider/schema-core");
2348            let result = if use_schema_core_schema {
2349                let pre_instance = linker.instantiate_pre(component.as_ref())?;
2350                let pre: SchemaSchemaCorePre<ComponentState> =
2351                    SchemaSchemaCorePre::new(pre_instance)?;
2352                let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
2353                let provider = bindings.greentic_provider_schema_core_schema_core_api();
2354                provider.call_invoke(&mut store, &op_owned, &input_owned)?
2355            } else if use_schema_core_path {
2356                let pre_instance = linker.instantiate_pre(component.as_ref())?;
2357                let path_attempt = (|| -> Result<Vec<u8>> {
2358                    let pre: PathSchemaCorePre<ComponentState> =
2359                        PathSchemaCorePre::new(pre_instance)?;
2360                    let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
2361                    let provider = bindings.greentic_provider_schema_core_api();
2362                    Ok(provider.call_invoke(&mut store, &op_owned, &input_owned)?)
2363                })();
2364                match path_attempt {
2365                    Ok(value) => value,
2366                    Err(path_err)
2367                        if path_err.to_string().contains("no exported instance named") =>
2368                    {
2369                        let pre_instance = linker.instantiate_pre(component.as_ref())?;
2370                        let pre: SchemaSchemaCorePre<ComponentState> =
2371                            SchemaSchemaCorePre::new(pre_instance)?;
2372                        let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
2373                        let provider = bindings.greentic_provider_schema_core_schema_core_api();
2374                        provider.call_invoke(&mut store, &op_owned, &input_owned)?
2375                    }
2376                    Err(path_err) => return Err(path_err),
2377                }
2378            } else {
2379                let pre_instance = linker.instantiate_pre(component.as_ref())?;
2380                let pre: LegacySchemaCorePre<ComponentState> =
2381                    LegacySchemaCorePre::new(pre_instance)?;
2382                let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
2383                let provider = bindings.greentic_provider_core_schema_core_api();
2384                provider.call_invoke(&mut store, &op_owned, &input_owned)?
2385            };
2386            deserialize_json_bytes(result)
2387        })
2388    }
2389
2390    /// Call the provider component's `identify-instance` export
2391    /// (`greentic:provider-instance-identity@0.1.0`) with the inbound
2392    /// payload bytes. Returns an [`IdentifyOutcome`] — see the variant
2393    /// docs for the per-case contract.
2394    ///
2395    /// # Payload shape (M1 IID.4d wrapper)
2396    ///
2397    /// `payload` is forwarded opaque to the component. The shape is set by
2398    /// the caller; the M1 IID.4d wrapper convention from `greentic-start`
2399    /// is `{headers: [{name,value}], body: <parsed-or-null>}` so providers
2400    /// whose discriminator lives in HTTP headers (Telegram via
2401    /// `x-telegram-bot-api-secret-token`) can identify the instance the
2402    /// same call shape that body-based providers (Teams, Slack, Webex,
2403    /// etc.) use. See the docstring on
2404    /// `greentic:provider-instance-identity/instance-identity-api.identify-instance`
2405    /// for the full contract; this host method does not parse or
2406    /// validate the bytes.
2407    ///
2408    /// # Host authority on identity probes
2409    ///
2410    /// The linker registers the full host import surface (Wasmtime
2411    /// validates all imports eagerly at `instantiate_pre`, not just
2412    /// those reachable from the invoked export). The WASI sandbox is
2413    /// locked down: no preopens, no env, no stdio. Deny-shim linker
2414    /// handlers (trap on call, satisfy at link time) are a follow-up
2415    /// in `greentic-interfaces-wasmtime`. See [`register_identity_probe`].
2416    pub async fn invoke_identify_instance(
2417        &self,
2418        binding: &ProviderBinding,
2419        payload: Vec<u8>,
2420    ) -> Result<IdentifyOutcome> {
2421        let component_ref_owned = binding.component_ref.clone();
2422        let pack_component = self.components.get(&component_ref_owned).with_context(|| {
2423            format!("provider component '{component_ref_owned}' not found in pack")
2424        })?;
2425        let component = pack_component.component.clone();
2426
2427        let engine = self.engine.clone();
2428        let config = Arc::clone(&self.config);
2429        let http_client = Arc::clone(&self.http_client);
2430        let mocks = self.mocks.clone();
2431        let session_store = self.session_store.clone();
2432        let state_store = self.state_store.clone();
2433        let secrets = Arc::clone(&self.secrets);
2434        let oauth_config = self.oauth_config.clone();
2435        let pack_id = self.metadata().pack_id.clone();
2436
2437        // Locked-down WASI policy: no preopens, no env, no stdio.
2438        // The linker registers all imports (Wasmtime requires it for
2439        // instantiate_pre), but the WASI sandbox is the tightest we
2440        // can enforce today. See [`register_identity_probe`] docs.
2441        let wasi_policy = Arc::new(RunnerWasiPolicy::probe());
2442        let runtime_config_non_secret = self.runtime_config_non_secret.clone();
2443        let runtime_refs = self.runtime_refs.clone();
2444        run_on_wasi_thread("provider.identify_instance", move || {
2445            let mut linker = Linker::new(&engine);
2446            register_identity_probe(&mut linker)?;
2447            let host_state = HostState::new(
2448                pack_id.clone(),
2449                config,
2450                http_client,
2451                mocks,
2452                session_store,
2453                state_store,
2454                secrets,
2455                oauth_config,
2456                None,
2457                Some(component_ref_owned.clone()),
2458                true,
2459                runtime_config_non_secret,
2460                runtime_refs,
2461            )?;
2462            let store_state = ComponentState::new(host_state, wasi_policy)?;
2463            let mut store = wasmtime::Store::new(&engine, store_state);
2464
2465            let pre_instance = linker.instantiate_pre(component.as_ref())?;
2466            let pre = match InstanceIdentityPre::<ComponentState>::new(pre_instance) {
2467                Ok(pre) => pre,
2468                Err(err) if is_missing_export_error(&format!("{err:#}")) => {
2469                    return Ok(IdentifyOutcome::Unsupported);
2470                }
2471                Err(err) => return Err(err.into()),
2472            };
2473            let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
2474            let api = bindings.greentic_provider_instance_identity_instance_identity_api();
2475            let result = api.call_identify_instance(&mut store, &payload)?;
2476            Ok(match result {
2477                Some(id) => IdentifyOutcome::Identified(id),
2478                None => IdentifyOutcome::NoMatch,
2479            })
2480        })
2481    }
2482
2483    /// Call the provider component's `describe-identify-instance` export
2484    /// (`greentic:provider-instance-identity/instance-identity-describe@0.1.0`)
2485    /// and parse the returned JSON into an [`IdentifyInstanceHint`].
2486    ///
2487    /// Returns `Ok(None)` for every "no hint available" case: the
2488    /// component does not export the describe world, the export returned
2489    /// `none`, the returned bytes are not valid JSON, or the `version`
2490    /// gate failed. The two malformed cases are warn-logged so a typo'd
2491    /// hint surfaces in operator logs without blocking ingest. Component
2492    /// traps and other infrastructure errors propagate as `Err`.
2493    ///
2494    /// This is the uncached probe — see [`resolve_identify_hint`] for the
2495    /// cached wrapper that callers SHOULD use on the inbound hot path.
2496    ///
2497    /// [`resolve_identify_hint`]: PackRuntime::resolve_identify_hint
2498    pub async fn invoke_describe_identify_instance(
2499        &self,
2500        binding: &ProviderBinding,
2501    ) -> Result<Option<IdentifyInstanceHint>> {
2502        let component_ref_owned = binding.component_ref.clone();
2503        let pack_component = self.components.get(&component_ref_owned).with_context(|| {
2504            format!("provider component '{component_ref_owned}' not found in pack")
2505        })?;
2506        let component = pack_component.component.clone();
2507
2508        let engine = self.engine.clone();
2509        let config = Arc::clone(&self.config);
2510        let http_client = Arc::clone(&self.http_client);
2511        let mocks = self.mocks.clone();
2512        let session_store = self.session_store.clone();
2513        let state_store = self.state_store.clone();
2514        let secrets = Arc::clone(&self.secrets);
2515        let oauth_config = self.oauth_config.clone();
2516        let pack_id = self.metadata().pack_id.clone();
2517
2518        // Locked-down WASI policy — same rationale as
2519        // `invoke_identify_instance`. See [`register_identity_probe`] docs.
2520        let wasi_policy = Arc::new(RunnerWasiPolicy::probe());
2521        let runtime_config_non_secret = self.runtime_config_non_secret.clone();
2522        let runtime_refs = self.runtime_refs.clone();
2523        run_on_wasi_thread("provider.describe_identify_instance", move || {
2524            let mut linker = Linker::new(&engine);
2525            register_identity_probe(&mut linker)?;
2526            let host_state = HostState::new(
2527                pack_id.clone(),
2528                config,
2529                http_client,
2530                mocks,
2531                session_store,
2532                state_store,
2533                secrets,
2534                oauth_config,
2535                None,
2536                Some(component_ref_owned.clone()),
2537                true,
2538                runtime_config_non_secret,
2539                runtime_refs,
2540            )?;
2541            let store_state = ComponentState::new(host_state, wasi_policy)?;
2542            let mut store = wasmtime::Store::new(&engine, store_state);
2543
2544            let pre_instance = linker.instantiate_pre(component.as_ref())?;
2545            let pre = match InstanceIdentityDescribePre::<ComponentState>::new(pre_instance) {
2546                Ok(pre) => pre,
2547                Err(err) if is_missing_export_error(&format!("{err:#}")) => {
2548                    return Ok(None);
2549                }
2550                Err(err) => return Err(err.into()),
2551            };
2552            let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
2553            let api = bindings.greentic_provider_instance_identity_instance_identity_describe_api();
2554            let raw = api.call_describe_identify_instance(&mut store)?;
2555            let Some(bytes) = raw else {
2556                // Component exported the world but said "no hint right now".
2557                // Per the WIT contract this is equivalent to a missing
2558                // export — unhinted fallback at the caller.
2559                return Ok(None);
2560            };
2561            match IdentifyInstanceHint::from_json(&bytes) {
2562                Ok(hint) => Ok(Some(hint)),
2563                Err(err) => {
2564                    // Malformed hint or wrong version. Don't fail closed:
2565                    // the contract demands the host fall back to unhinted
2566                    // (invoke identify-instance with the global allowlist).
2567                    // Warn so the provider author can fix the hint.
2568                    warn!(
2569                        event = "provider.describe_identify_instance.malformed",
2570                        component_ref = %component_ref_owned,
2571                        error = %err,
2572                        "ignoring malformed describe-identify-instance hint; \
2573                         falling back to unhinted wrapper"
2574                    );
2575                    Ok(None)
2576                }
2577            }
2578        })
2579    }
2580
2581    /// Cached wrapper around [`invoke_describe_identify_instance`]. The
2582    /// hint for a given `binding.component_ref` is invariant across
2583    /// inbound requests within a revision (it is a function of the
2584    /// component itself, not of the payload), so we probe lazily on
2585    /// first ask and reuse thereafter. `ArcSwap`-driven revision swaps
2586    /// allocate a fresh [`PackRuntime`], naturally invalidating the cache.
2587    ///
2588    /// Returns `None` when the component does not export the describe
2589    /// world, when the probe returns no hint, or when the probe fails
2590    /// (trap, timeout, instantiation error). Failures are warn-logged
2591    /// and cached — the same trap is logged once per revision per
2592    /// component, not per request.
2593    ///
2594    /// [`invoke_describe_identify_instance`]:
2595    ///     PackRuntime::invoke_describe_identify_instance
2596    pub async fn resolve_identify_hint(
2597        &self,
2598        binding: &ProviderBinding,
2599    ) -> Option<IdentifyInstanceHint> {
2600        if let Some(cached) = self.identify_hint_cache.read().get(&binding.component_ref) {
2601            return cached.clone();
2602        }
2603        let hint = match self.invoke_describe_identify_instance(binding).await {
2604            Ok(hint) => hint,
2605            Err(err) => {
2606                warn!(
2607                    event = "provider.describe_identify_instance.failed",
2608                    component_ref = %binding.component_ref,
2609                    error = %err,
2610                    "describe-identify-instance probe failed; \
2611                     falling back to unhinted wrapper for this component"
2612                );
2613                None
2614            }
2615        };
2616        // Tolerate a concurrent populate — `insert` is idempotent on the
2617        // same (component_ref, hint) shape and the probe is pure w.r.t.
2618        // the component, so re-probing on a write-race yields identical
2619        // bytes.
2620        self.identify_hint_cache
2621            .write()
2622            .insert(binding.component_ref.clone(), hint.clone());
2623        hint
2624    }
2625
2626    /// Fan out [`resolve_identify_hint`] over each requested `provider_type`.
2627    /// Result map is keyed by `provider_type`; `None` value means the
2628    /// pack has no binding for that type OR the binding's component does
2629    /// not export the describe world (unhinted — caller forwards input
2630    /// headers unfiltered for back-compat).
2631    ///
2632    /// `provider_id`-collision errors from [`ProviderRegistry::resolve`]
2633    /// against a `provider_type` query are propagated (M1.1 invariant
2634    /// violation, malformed pack).
2635    ///
2636    /// Fan out [`resolve_identify_hint`] across requested types. `None` value
2637    /// means the pack has no binding for that type OR the binding's component
2638    /// does not export the describe world.
2639    ///
2640    /// The per-binding loop is inlined (rather than factored into a shared
2641    /// `AsyncFnMut`-based helper) deliberately: routing through an
2642    /// `AsyncFnMut` closure destabilises HRTB `Send` inference for the
2643    /// returned future, which propagates up to host-level fan-out APIs and
2644    /// from there to downstream spawned-service consumers. See the
2645    /// regression test `identify_futures_are_send` on the host.
2646    ///
2647    /// [`resolve_identify_hint`]: PackRuntime::resolve_identify_hint
2648    pub async fn describe_identify_hints_by_provider_type(
2649        &self,
2650        provider_types: &[&str],
2651    ) -> Result<HashMap<String, Option<IdentifyInstanceHint>>> {
2652        let mut out = HashMap::with_capacity(provider_types.len());
2653        let registry = match self.provider_registry_optional()? {
2654            Some(registry) => registry,
2655            None => {
2656                for ty in provider_types {
2657                    out.insert((*ty).to_string(), None);
2658                }
2659                return Ok(out);
2660            }
2661        };
2662        for ty in provider_types {
2663            let Some(binding) = registry.try_resolve(None, Some(ty))? else {
2664                out.insert((*ty).to_string(), None);
2665                continue;
2666            };
2667            let hint = self.resolve_identify_hint(&binding).await;
2668            out.insert((*ty).to_string(), hint);
2669        }
2670        Ok(out)
2671    }
2672
2673    /// Unscoped legacy API: fan out [`invoke_identify_instance`] with the
2674    /// caller-supplied opaque `payload` bytes forwarded verbatim. No
2675    /// describe-identify-instance hint lookup, no per-provider header
2676    /// scoping. New callers should use the `_scoped` sibling for
2677    /// per-provider header allowlist scoping (Phase D).
2678    ///
2679    /// Loop inlined for the same reason as
2680    /// [`describe_identify_hints_by_provider_type`].
2681    ///
2682    /// [`invoke_identify_instance`]: PackRuntime::invoke_identify_instance
2683    /// [`describe_identify_hints_by_provider_type`]:
2684    ///     PackRuntime::describe_identify_hints_by_provider_type
2685    pub async fn identify_endpoints_by_provider_type(
2686        &self,
2687        provider_types: &[&str],
2688        payload: &[u8],
2689    ) -> Result<HashMap<String, IdentifyOutcome>> {
2690        let mut out = HashMap::with_capacity(provider_types.len());
2691        let registry = match self.provider_registry_optional()? {
2692            Some(registry) => registry,
2693            None => {
2694                for ty in provider_types {
2695                    out.insert((*ty).to_string(), IdentifyOutcome::Unsupported);
2696                }
2697                return Ok(out);
2698            }
2699        };
2700        for ty in provider_types {
2701            let Some(binding) = registry.try_resolve(None, Some(ty))? else {
2702                out.insert((*ty).to_string(), IdentifyOutcome::Unsupported);
2703                continue;
2704            };
2705            let outcome = self
2706                .invoke_identify_instance(&binding, payload.to_vec())
2707                .await?;
2708            out.insert((*ty).to_string(), outcome);
2709        }
2710        Ok(out)
2711    }
2712
2713    /// Per-provider scoped variant of [`identify_endpoints_by_provider_type`].
2714    ///
2715    /// The wrapper payload is built per-binding from `(headers, body)` and
2716    /// the component's cached identify-instance hint (see
2717    /// [`resolve_identify_hint`]): hinted providers see only the headers
2718    /// their hint declares; unhinted providers see every header passed in.
2719    /// Result-map semantics match the unscoped variant.
2720    ///
2721    /// Loop inlined for the same reason as
2722    /// [`describe_identify_hints_by_provider_type`].
2723    ///
2724    /// [`identify_endpoints_by_provider_type`]:
2725    ///     PackRuntime::identify_endpoints_by_provider_type
2726    /// [`resolve_identify_hint`]: PackRuntime::resolve_identify_hint
2727    /// [`describe_identify_hints_by_provider_type`]:
2728    ///     PackRuntime::describe_identify_hints_by_provider_type
2729    pub async fn identify_endpoints_by_provider_type_scoped(
2730        &self,
2731        provider_types: &[&str],
2732        headers: &[(String, String)],
2733        body: &Value,
2734    ) -> Result<HashMap<String, IdentifyOutcome>> {
2735        let mut out = HashMap::with_capacity(provider_types.len());
2736        let registry = match self.provider_registry_optional()? {
2737            Some(registry) => registry,
2738            None => {
2739                for ty in provider_types {
2740                    out.insert((*ty).to_string(), IdentifyOutcome::Unsupported);
2741                }
2742                return Ok(out);
2743            }
2744        };
2745        for ty in provider_types {
2746            let Some(binding) = registry.try_resolve(None, Some(ty))? else {
2747                out.insert((*ty).to_string(), IdentifyOutcome::Unsupported);
2748                continue;
2749            };
2750            let hint = self.resolve_identify_hint(&binding).await;
2751            let payload = build_scoped_identify_payload(headers, body, hint.as_ref());
2752            let outcome = self.invoke_identify_instance(&binding, payload).await?;
2753            out.insert((*ty).to_string(), outcome);
2754        }
2755        Ok(out)
2756    }
2757
2758    pub(crate) fn provider_registry(&self) -> Result<ProviderRegistry> {
2759        if let Some(registry) = self.provider_registry.read().clone() {
2760            return Ok(registry);
2761        }
2762        let manifest = self
2763            .manifest
2764            .as_ref()
2765            .context("pack manifest required for provider resolution")?;
2766        let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
2767        let registry = ProviderRegistry::new(
2768            manifest,
2769            self.state_store.clone(),
2770            &self.config.tenant,
2771            &env,
2772        )?;
2773        *self.provider_registry.write() = Some(registry.clone());
2774        Ok(registry)
2775    }
2776
2777    pub(crate) fn provider_registry_optional(&self) -> Result<Option<ProviderRegistry>> {
2778        if self.manifest.is_none() {
2779            return Ok(None);
2780        }
2781        Ok(Some(self.provider_registry()?))
2782    }
2783
2784    pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
2785        if let Some(cache) = &self.flows {
2786            return cache
2787                .flows
2788                .get(flow_id)
2789                .cloned()
2790                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
2791        }
2792        if let Some(manifest) = &self.manifest {
2793            let entry = manifest
2794                .flows
2795                .iter()
2796                .find(|f| f.id.as_str() == flow_id)
2797                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
2798            return Ok(entry.flow.clone());
2799        }
2800        bail!("flow '{flow_id}' not available (pack exports disabled)")
2801    }
2802
2803    pub fn metadata(&self) -> &PackMetadata {
2804        &self.metadata
2805    }
2806
2807    /// Read an asset file from the pack's assets directory.
2808    ///
2809    /// Accepts paths like `assets/cards/card-a.json` or `cards/card-a.json`
2810    /// (the `assets/` prefix is stripped automatically).
2811    pub fn read_asset(&self, asset_path: &str) -> Result<Vec<u8>> {
2812        let normalized = asset_path
2813            .trim_start_matches("assets/")
2814            .trim_start_matches("/assets/");
2815        // Try assets tempdir first (extracted from archive).
2816        if let Some(tempdir) = &self.assets_tempdir {
2817            let full = tempdir.path().join("assets").join(normalized);
2818            if full.exists() {
2819                return std::fs::read(&full)
2820                    .with_context(|| format!("read asset {}", full.display()));
2821            }
2822        }
2823        // Try materialized directory.
2824        let full = self.path.join("assets").join(normalized);
2825        if full.exists() {
2826            return std::fs::read(&full).with_context(|| format!("read asset {}", full.display()));
2827        }
2828        bail!("asset not found: {}", asset_path)
2829    }
2830
2831    pub fn component_manifest(&self, component_ref: &str) -> Option<&ComponentManifest> {
2832        self.component_manifests.get(component_ref)
2833    }
2834
2835    pub fn describe_component_contract_v0_6(&self, component_ref: &str) -> Result<Option<Value>> {
2836        let pack_component = self
2837            .components
2838            .get(component_ref)
2839            .with_context(|| format!("component '{component_ref}' not found in pack"))?;
2840        let engine = self.engine.clone();
2841        let config = Arc::clone(&self.config);
2842        let http_client = Arc::clone(&self.http_client);
2843        let mocks = self.mocks.clone();
2844        let session_store = self.session_store.clone();
2845        let state_store = self.state_store.clone();
2846        let secrets = Arc::clone(&self.secrets);
2847        let oauth_config = self.oauth_config.clone();
2848        let wasi_policy = Arc::clone(&self.wasi_policy);
2849        let pack_id = self.metadata().pack_id.clone();
2850        let allow_state_store = self.allows_state_store(component_ref);
2851        let component = pack_component.component.clone();
2852        let component_ref_owned = component_ref.to_string();
2853        let runtime_config_non_secret = self.runtime_config_non_secret.clone();
2854        let runtime_refs = self.runtime_refs.clone();
2855
2856        run_on_wasi_thread("component.describe", move || {
2857            let mut linker = Linker::new(&engine);
2858            register_all(&mut linker, allow_state_store)?;
2859            add_component_control_to_linker(&mut linker)?;
2860
2861            let host_state = HostState::new(
2862                pack_id.clone(),
2863                config,
2864                http_client,
2865                mocks,
2866                session_store,
2867                state_store,
2868                secrets,
2869                oauth_config,
2870                None,
2871                Some(component_ref_owned),
2872                false,
2873                runtime_config_non_secret,
2874                runtime_refs,
2875            )?;
2876            let store_state = ComponentState::new(host_state, wasi_policy)?;
2877            let mut store = wasmtime::Store::new(&engine, store_state);
2878            let pre_instance = linker.instantiate_pre(&component)?;
2879            let pre = match component_api::v0_6_descriptor::ComponentV0V6V0Pre::new(pre_instance) {
2880                Ok(pre) => pre,
2881                Err(_) => return Ok(None),
2882            };
2883            let bytes = block_on(async {
2884                let bindings = pre.instantiate_async(&mut store).await?;
2885                let descriptor = bindings.greentic_component_component_descriptor();
2886                descriptor.call_describe(&mut store)
2887            })?;
2888
2889            if bytes.is_empty() {
2890                return Ok(Some(Value::Null));
2891            }
2892            if let Ok(value) = serde_cbor::from_slice::<Value>(&bytes) {
2893                return Ok(Some(value));
2894            }
2895            if let Ok(value) = serde_json::from_slice::<Value>(&bytes) {
2896                return Ok(Some(value));
2897            }
2898            if let Ok(text) = String::from_utf8(bytes) {
2899                if let Ok(value) = serde_json::from_str::<Value>(&text) {
2900                    return Ok(Some(value));
2901                }
2902                return Ok(Some(Value::String(text)));
2903            }
2904            Ok(Some(Value::Null))
2905        })
2906    }
2907
2908    pub fn load_schema_json(&self, schema_ref: &str) -> Result<Option<Value>> {
2909        let rel = normalize_schema_ref(schema_ref)?;
2910        if self.path.is_dir() {
2911            let candidate = self.path.join(&rel);
2912            if candidate.exists() {
2913                let bytes = std::fs::read(&candidate).with_context(|| {
2914                    format!("failed to read schema file {}", candidate.display())
2915                })?;
2916                let value = serde_json::from_slice::<Value>(&bytes)
2917                    .with_context(|| format!("invalid schema JSON in {}", candidate.display()))?;
2918                return Ok(Some(value));
2919            }
2920        }
2921
2922        if let Some(archive_path) = self
2923            .archive_path
2924            .as_ref()
2925            .or_else(|| path_is_gtpack(&self.path).then_some(&self.path))
2926        {
2927            let file = File::open(archive_path)
2928                .with_context(|| format!("failed to open {}", archive_path.display()))?;
2929            let mut archive = ZipArchive::new(file)
2930                .with_context(|| format!("failed to read pack {}", archive_path.display()))?;
2931            match archive.by_name(&rel) {
2932                Ok(mut entry) => {
2933                    let mut bytes = Vec::new();
2934                    entry.read_to_end(&mut bytes)?;
2935                    let value = serde_json::from_slice::<Value>(&bytes).with_context(|| {
2936                        format!("invalid schema JSON in {}:{}", archive_path.display(), rel)
2937                    })?;
2938                    Ok(Some(value))
2939                }
2940                Err(zip::result::ZipError::FileNotFound) => Ok(None),
2941                Err(err) => Err(anyhow!(err)).with_context(|| {
2942                    format!(
2943                        "failed to read schema `{}` from {}",
2944                        rel,
2945                        archive_path.display()
2946                    )
2947                }),
2948            }
2949        } else {
2950            Ok(None)
2951        }
2952    }
2953
2954    pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
2955        &self.metadata.secret_requirements
2956    }
2957
2958    pub fn missing_secrets(
2959        &self,
2960        tenant_ctx: &TypesTenantCtx,
2961    ) -> Vec<greentic_types::SecretRequirement> {
2962        let env = tenant_ctx.env.as_str().to_string();
2963        let tenant = tenant_ctx.tenant.as_str().to_string();
2964        let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
2965        self.required_secrets()
2966            .iter()
2967            .filter(|req| {
2968                // scope must match current context if provided
2969                if let Some(scope) = &req.scope {
2970                    if scope.env != env {
2971                        return false;
2972                    }
2973                    if scope.tenant != tenant {
2974                        return false;
2975                    }
2976                    if let Some(ref team_req) = scope.team
2977                        && team.as_ref() != Some(team_req)
2978                    {
2979                        return false;
2980                    }
2981                }
2982                let ctx = self.config.tenant_ctx();
2983                read_secret_blocking(
2984                    &self.secrets,
2985                    &ctx,
2986                    &self.metadata.pack_id,
2987                    canonicalize_secret_key(req.key.as_str()).as_str(),
2988                )
2989                .is_err()
2990            })
2991            .cloned()
2992            .collect()
2993    }
2994
2995    pub fn for_component_test(
2996        components: Vec<(String, PathBuf)>,
2997        flows: HashMap<String, FlowIR>,
2998        pack_id: &str,
2999        config: Arc<HostConfig>,
3000    ) -> Result<Self> {
3001        let engine = Engine::default();
3002        let engine_profile =
3003            EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
3004        let cache = CacheManager::new(CacheConfig::default(), engine_profile);
3005        let mut component_map = HashMap::new();
3006        for (name, path) in components {
3007            if !path.exists() {
3008                bail!("component artifact missing: {}", path.display());
3009            }
3010            let wasm_bytes = std::fs::read(&path)?;
3011            let component =
3012                Arc::new(Component::from_binary(&engine, &wasm_bytes).map_err(|err| {
3013                    anyhow!("failed to compile component {}: {err}", path.display())
3014                })?);
3015            component_map.insert(
3016                name.clone(),
3017                PackComponent {
3018                    name,
3019                    version: "0.0.0".into(),
3020                    component,
3021                },
3022            );
3023        }
3024
3025        let mut flow_map = HashMap::new();
3026        let mut descriptors = Vec::new();
3027        for (id, ir) in flows {
3028            let flow_type = ir.flow_type.clone();
3029            let flow = flow_ir_to_flow(ir)?;
3030            flow_map.insert(id.clone(), flow);
3031            descriptors.push(FlowDescriptor {
3032                id: id.clone(),
3033                flow_type,
3034                pack_id: pack_id.to_string(),
3035                profile: "test".into(),
3036                version: "0.0.0".into(),
3037                description: None,
3038            });
3039        }
3040        let entry_flows = descriptors.iter().map(|flow| flow.id.clone()).collect();
3041        let metadata = PackMetadata {
3042            pack_id: pack_id.to_string(),
3043            version: "0.0.0".into(),
3044            entry_flows,
3045            secret_requirements: Vec::new(),
3046        };
3047        let flows_cache = PackFlows {
3048            descriptors: descriptors.clone(),
3049            flows: flow_map,
3050            metadata: metadata.clone(),
3051        };
3052
3053        Ok(Self {
3054            path: PathBuf::new(),
3055            archive_path: None,
3056            config,
3057            engine,
3058            metadata,
3059            manifest: None,
3060            legacy_manifest: None,
3061            component_manifests: HashMap::new(),
3062            mocks: None,
3063            flows: Some(flows_cache),
3064            components: component_map,
3065            http_client: Arc::clone(&HTTP_CLIENT),
3066            pre_cache: Mutex::new(HashMap::new()),
3067            session_store: None,
3068            state_store: None,
3069            wasi_policy: Arc::new(RunnerWasiPolicy::new()),
3070            assets_tempdir: None,
3071            provider_registry: RwLock::new(None),
3072            identify_hint_cache: RwLock::new(HashMap::new()),
3073            secrets: crate::secrets::default_manager()?,
3074            oauth_config: None,
3075            cache,
3076            runtime_config_non_secret: None,
3077            runtime_refs: None,
3078        })
3079    }
3080}
3081
3082/// Resolve a flow node's component reference to the key under which the
3083/// component is actually registered, given the requested `operation` and a
3084/// `is_registered` membership predicate over the pack's component keys.
3085///
3086/// greentic-pack resolves a component node to a bare component symbol
3087/// (e.g. `ai.greentic.component-templates`) and carries the operation
3088/// separately, so the full reference is the registration key. Older,
3089/// hand-authored flows instead pack the operation into the node id
3090/// (`qa.process`) while registering the component under the bare name
3091/// (`qa`). For those, fall back to the segment before the last dot — but
3092/// ONLY when that trailing segment IS the requested operation. Without the
3093/// suffix check, a missing dotted component whose prefix happens to be a
3094/// *different* registered component (`ai.greentic.component-templates` absent,
3095/// `ai.greentic` present) would silently resolve to the wrong component and
3096/// run it with the caller's tenant/session/state/secrets. Returns the
3097/// reference unchanged when neither form matches, so the caller's
3098/// "not found" error names the original reference.
3099fn resolve_component_key<'a>(
3100    component_ref: &'a str,
3101    operation: &str,
3102    is_registered: impl Fn(&str) -> bool,
3103) -> &'a str {
3104    if is_registered(component_ref) {
3105        return component_ref;
3106    }
3107    if let Some((prefix, suffix)) = component_ref.rsplit_once('.')
3108        && suffix == operation
3109        && is_registered(prefix)
3110    {
3111        return prefix;
3112    }
3113    component_ref
3114}
3115
3116#[cfg(test)]
3117mod resolve_component_key_tests {
3118    use super::resolve_component_key;
3119    use std::collections::HashSet;
3120
3121    fn registered(keys: &[&'static str]) -> impl Fn(&str) -> bool {
3122        let set: HashSet<&'static str> = keys.iter().copied().collect();
3123        move |key: &str| set.contains(key)
3124    }
3125
3126    #[test]
3127    fn full_reference_is_used_when_registered() {
3128        // greentic-pack's resolved symbol: full ref is the registration key.
3129        let is_reg = registered(&["ai.greentic.component-templates", "ai.greentic"]);
3130        assert_eq!(
3131            resolve_component_key("ai.greentic.component-templates", "handle_message", is_reg),
3132            "ai.greentic.component-templates"
3133        );
3134    }
3135
3136    #[test]
3137    fn legacy_packed_id_falls_back_when_suffix_is_operation() {
3138        // `qa.process` packs op into the id; component registered as `qa`.
3139        let is_reg = registered(&["qa"]);
3140        assert_eq!(resolve_component_key("qa.process", "process", is_reg), "qa");
3141    }
3142
3143    #[test]
3144    fn drifted_dotted_reference_does_not_fall_back_to_prefix() {
3145        // Full symbol absent, a *different* prefix component present, and the
3146        // trailing segment is NOT the requested operation -> must not silently
3147        // resolve to the prefix; return the original so the caller errors out.
3148        let is_reg = registered(&["ai.greentic"]);
3149        assert_eq!(
3150            resolve_component_key("ai.greentic.component-templates", "handle_message", is_reg),
3151            "ai.greentic.component-templates"
3152        );
3153    }
3154
3155    #[test]
3156    fn unregistered_reference_is_returned_unchanged() {
3157        let is_reg = registered(&[]);
3158        assert_eq!(resolve_component_key("foo", "bar", is_reg), "foo");
3159    }
3160}
3161
3162fn normalize_schema_ref(schema_ref: &str) -> Result<String> {
3163    let candidate = schema_ref.trim();
3164    if candidate.is_empty() {
3165        bail!("schema ref cannot be empty");
3166    }
3167    let path = Path::new(candidate);
3168    if path.is_absolute() {
3169        bail!("schema ref must be relative: {}", schema_ref);
3170    }
3171    let mut normalized = PathBuf::new();
3172    for component in path.components() {
3173        match component {
3174            std::path::Component::Normal(part) => normalized.push(part),
3175            std::path::Component::CurDir => {}
3176            _ => bail!("schema ref must not contain traversal: {}", schema_ref),
3177        }
3178    }
3179    let normalized = normalized
3180        .to_str()
3181        .map(ToString::to_string)
3182        .ok_or_else(|| anyhow!("schema ref must be valid UTF-8"))?;
3183    if normalized.is_empty() {
3184        bail!("schema ref cannot normalize to empty path");
3185    }
3186    Ok(normalized)
3187}
3188
3189fn path_is_gtpack(path: &Path) -> bool {
3190    path.extension()
3191        .and_then(|ext| ext.to_str())
3192        .map(|ext| ext.eq_ignore_ascii_case("gtpack"))
3193        .unwrap_or(false)
3194}
3195
3196fn is_missing_node_export(err: &wasmtime::Error, version: &str) -> bool {
3197    let message = err.to_string();
3198    message.contains("no exported instance named")
3199        && message.contains(&format!("greentic:component/node@{version}"))
3200}
3201
3202struct PackFlows {
3203    descriptors: Vec<FlowDescriptor>,
3204    flows: HashMap<String, Flow>,
3205    metadata: PackMetadata,
3206}
3207
3208const RUNTIME_FLOW_EXTENSION_IDS: [&str; 3] = [
3209    "greentic.pack.runtime_flow",
3210    "greentic.pack.flow_runtime",
3211    "greentic.pack.runtime_flows",
3212];
3213
3214#[derive(Debug, Deserialize)]
3215struct RuntimeFlowBundle {
3216    flows: Vec<RuntimeFlow>,
3217}
3218
3219#[derive(Debug, Deserialize)]
3220struct RuntimeFlow {
3221    id: String,
3222    #[serde(alias = "flow_type")]
3223    kind: FlowKind,
3224    #[serde(default)]
3225    schema_version: Option<String>,
3226    #[serde(default)]
3227    start: Option<String>,
3228    #[serde(default)]
3229    entrypoints: BTreeMap<String, Value>,
3230    nodes: BTreeMap<String, RuntimeNode>,
3231    #[serde(default)]
3232    metadata: Option<FlowMetadata>,
3233}
3234
3235#[derive(Debug, Deserialize)]
3236struct RuntimeNode {
3237    #[serde(alias = "component")]
3238    component_id: String,
3239    #[serde(default, alias = "operation")]
3240    operation_name: Option<String>,
3241    #[serde(default, alias = "payload", alias = "input")]
3242    operation_payload: Value,
3243    #[serde(default)]
3244    config: Value,
3245    #[serde(default)]
3246    routing: Option<Routing>,
3247    #[serde(default)]
3248    telemetry: Option<TelemetryHints>,
3249}
3250
3251fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
3252    if bytes.is_empty() {
3253        return Ok(Value::Null);
3254    }
3255    serde_json::from_slice(&bytes).or_else(|_| {
3256        String::from_utf8(bytes)
3257            .map(Value::String)
3258            .map_err(|err| anyhow!(err))
3259    })
3260}
3261
3262/// `wasmtime::component::bindgen!` returns this error shape when a
3263/// `*Pre::new(...)` call resolves a world whose required export is
3264/// absent on the component. We treat that as "component does not opt
3265/// in" and let the caller fall back to the operator's statically
3266/// declared instance. Mirrors the same pattern in `invoke_provider`
3267/// for the legacy/path schema-core fallback.
3268///
3269/// The match is intentionally narrow: the error must mention BOTH a
3270/// broad wasmtime marker (`"no exported instance named"` or
3271/// `"no exported function named"`) AND the identity-world-specific
3272/// name segment (`"instance-identity-api"`, `"identify-instance"`,
3273/// `"instance-identity-describe-api"`, or `"describe-identify-instance"`).
3274/// A component that exports the identity world with a malformed
3275/// signature or a typo'd function name will NOT be silently treated
3276/// as unsupported — it will surface as a hard error.
3277fn is_missing_export_error(message: &str) -> bool {
3278    let has_broad_marker = message.contains("no exported instance named")
3279        || message.contains("no exported function named");
3280    let has_identity_segment = message.contains("instance-identity-api")
3281        || message.contains("identify-instance")
3282        || message.contains("instance-identity-describe-api")
3283        || message.contains("describe-identify-instance");
3284    has_broad_marker && has_identity_segment
3285}
3286
3287/// Build the M1 IID.4d wrapper payload (`{ headers, body }`) scoped per
3288/// the provider's [`IdentifyInstanceHint`].
3289///
3290/// - `Some(hint)` ⇒ headers are filtered to ONLY those whose lowercase
3291///   name appears in [`hint.header_names()`](IdentifyInstanceHint::header_names).
3292///   A hint with no `Header` sources yields `"headers": []` — the
3293///   component is declaring that it identifies from the body alone.
3294/// - `None` ⇒ headers pass through unfiltered. The caller is responsible
3295///   for prefiltering (greentic-start applies a global allowlist at the
3296///   ingress boundary), so back-compat with not-yet-hinted providers
3297///   matches the pre-PR-B2 behavior exactly: every probed component
3298///   receives every allowlisted header.
3299///
3300/// `body` is forwarded verbatim regardless of hint shape. Body-path
3301/// short-circuit (using the hint's `BodyPath { json_pointer }` to skip
3302/// invoking `identify-instance` entirely) is a deliberately-deferred
3303/// Phase D follow-up — the current pass scopes the header allowlist only.
3304fn build_scoped_identify_payload(
3305    headers: &[(String, String)],
3306    body: &Value,
3307    hint: Option<&IdentifyInstanceHint>,
3308) -> Vec<u8> {
3309    let scoped_headers: Vec<&(String, String)> = match hint {
3310        // Hints carry 1-3 source headers in practice; a linear scan beats
3311        // a HashSet for that size (no hash + no allocation).
3312        Some(hint) => {
3313            let allowed = hint.header_names();
3314            headers
3315                .iter()
3316                .filter(|(name, _)| allowed.contains(&name.as_str()))
3317                .collect()
3318        }
3319        None => headers.iter().collect(),
3320    };
3321    let wrapper = serde_json::json!({
3322        "headers": scoped_headers
3323            .iter()
3324            .map(|(name, value)| serde_json::json!({ "name": name, "value": value }))
3325            .collect::<Vec<_>>(),
3326        "body": body,
3327    });
3328    serde_json::to_vec(&wrapper).expect("wrapper payload always serializes")
3329}
3330
3331#[cfg(test)]
3332mod build_scoped_identify_payload_tests {
3333    use super::*;
3334    use crate::identify_hint::HintSource;
3335    use serde_json::json;
3336
3337    fn hint(sources: Vec<HintSource>) -> IdentifyInstanceHint {
3338        IdentifyInstanceHint { sources }
3339    }
3340
3341    #[test]
3342    fn unhinted_passes_all_input_headers_through() {
3343        // Back-compat: components without describe-identify-instance must
3344        // continue to see every header the caller (greentic-start)
3345        // allowlisted. Pre-PR-B2 behavior verbatim.
3346        let headers = vec![
3347            (
3348                "x-telegram-bot-api-secret-token".into(),
3349                "telegram-tok".into(),
3350            ),
3351            ("x-future-routing-tag".into(), "abc".into()),
3352        ];
3353        let body = json!({ "update_id": 1 });
3354        let bytes = build_scoped_identify_payload(&headers, &body, None);
3355        let parsed: Value = serde_json::from_slice(&bytes).unwrap();
3356        assert_eq!(
3357            parsed["headers"],
3358            json!([
3359                { "name": "x-telegram-bot-api-secret-token", "value": "telegram-tok" },
3360                { "name": "x-future-routing-tag", "value": "abc" }
3361            ])
3362        );
3363        assert_eq!(parsed["body"], body);
3364    }
3365
3366    #[test]
3367    fn header_hint_filters_to_declared_names_only() {
3368        // Telegram-shape hint: declares one header, sees only that one.
3369        // Other allowlisted headers (e.g. a future Slack signature) MUST
3370        // NOT leak into the Telegram probe.
3371        let h = hint(vec![HintSource::Header {
3372            name: "x-telegram-bot-api-secret-token".into(),
3373        }]);
3374        let headers = vec![
3375            (
3376                "x-telegram-bot-api-secret-token".into(),
3377                "telegram-tok".into(),
3378            ),
3379            ("x-slack-signature".into(), "v0=sig".into()),
3380        ];
3381        let body = json!({});
3382        let bytes = build_scoped_identify_payload(&headers, &body, Some(&h));
3383        let parsed: Value = serde_json::from_slice(&bytes).unwrap();
3384        assert_eq!(
3385            parsed["headers"],
3386            json!([
3387                { "name": "x-telegram-bot-api-secret-token", "value": "telegram-tok" }
3388            ])
3389        );
3390    }
3391
3392    #[test]
3393    fn hints_without_header_sources_drop_all_headers() {
3394        // Body-path-only (Teams-shape) and degenerate-empty hints both yield
3395        // an empty `Header` source set; the wrapper MUST carry no headers
3396        // either way. Passing Telegram's secret token through to either is
3397        // the exact blast-radius bug PR-B2 closes.
3398        let headers = vec![(
3399            "x-telegram-bot-api-secret-token".into(),
3400            "should-not-leak".into(),
3401        )];
3402        let body = json!({ "anything": true });
3403        for h in [
3404            hint(vec![HintSource::BodyPath {
3405                json_pointer: "/recipient/id".into(),
3406            }]),
3407            hint(vec![]),
3408        ] {
3409            let bytes = build_scoped_identify_payload(&headers, &body, Some(&h));
3410            let parsed: Value = serde_json::from_slice(&bytes).unwrap();
3411            assert_eq!(parsed["headers"], json!([]), "hint={:?}", h.sources);
3412            assert_eq!(parsed["body"], body);
3413        }
3414    }
3415
3416    #[test]
3417    fn header_filter_preserves_input_order_and_dups() {
3418        // Multi-value headers and ordering matter to debuggability
3419        // (operators reading the wrapper from a probe should see the
3420        // headers in the same order they arrived). Filter is a
3421        // retain-only operation; no sort, no dedup.
3422        let h = hint(vec![HintSource::Header {
3423            name: "x-route".into(),
3424        }]);
3425        let headers = vec![
3426            ("x-route".into(), "a".into()),
3427            ("x-other".into(), "skip".into()),
3428            ("x-route".into(), "b".into()),
3429        ];
3430        let body = json!({});
3431        let bytes = build_scoped_identify_payload(&headers, &body, Some(&h));
3432        let parsed: Value = serde_json::from_slice(&bytes).unwrap();
3433        assert_eq!(
3434            parsed["headers"],
3435            json!([
3436                { "name": "x-route", "value": "a" },
3437                { "name": "x-route", "value": "b" }
3438            ])
3439        );
3440    }
3441}
3442
3443impl PackFlows {
3444    fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
3445        if let Some(flows) = flows_from_runtime_extension(&manifest) {
3446            return flows;
3447        }
3448        let descriptors = manifest
3449            .flows
3450            .iter()
3451            .map(|entry| FlowDescriptor {
3452                id: entry.id.as_str().to_string(),
3453                flow_type: flow_kind_to_str(entry.kind).to_string(),
3454                pack_id: manifest.pack_id.as_str().to_string(),
3455                profile: manifest.pack_id.as_str().to_string(),
3456                version: manifest.version.to_string(),
3457                description: None,
3458            })
3459            .collect();
3460        let mut flows = HashMap::new();
3461        for entry in &manifest.flows {
3462            flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
3463        }
3464        Self {
3465            metadata: PackMetadata::from_manifest(&manifest),
3466            descriptors,
3467            flows,
3468        }
3469    }
3470}
3471
3472fn flows_from_runtime_extension(manifest: &greentic_types::PackManifest) -> Option<PackFlows> {
3473    let extensions = manifest.extensions.as_ref()?;
3474    let extension = extensions.iter().find_map(|(key, ext)| {
3475        if RUNTIME_FLOW_EXTENSION_IDS
3476            .iter()
3477            .any(|candidate| candidate == key)
3478        {
3479            Some(ext)
3480        } else {
3481            None
3482        }
3483    })?;
3484    let runtime_flows = match decode_runtime_flow_extension(extension) {
3485        Some(flows) if !flows.is_empty() => flows,
3486        _ => return None,
3487    };
3488
3489    let descriptors = runtime_flows
3490        .iter()
3491        .map(|flow| FlowDescriptor {
3492            id: flow.id.as_str().to_string(),
3493            flow_type: flow_kind_to_str(flow.kind).to_string(),
3494            pack_id: manifest.pack_id.as_str().to_string(),
3495            profile: manifest.pack_id.as_str().to_string(),
3496            version: manifest.version.to_string(),
3497            description: None,
3498        })
3499        .collect::<Vec<_>>();
3500    let flows = runtime_flows
3501        .into_iter()
3502        .map(|flow| (flow.id.as_str().to_string(), flow))
3503        .collect();
3504
3505    Some(PackFlows {
3506        metadata: PackMetadata::from_manifest(manifest),
3507        descriptors,
3508        flows,
3509    })
3510}
3511
3512fn decode_runtime_flow_extension(extension: &ExtensionRef) -> Option<Vec<Flow>> {
3513    let value = match extension.inline.as_ref()? {
3514        ExtensionInline::Other(value) => value.clone(),
3515        _ => return None,
3516    };
3517
3518    if let Ok(bundle) = serde_json::from_value::<RuntimeFlowBundle>(value.clone()) {
3519        return Some(collect_runtime_flows(bundle.flows));
3520    }
3521
3522    if let Ok(flows) = serde_json::from_value::<Vec<RuntimeFlow>>(value.clone()) {
3523        return Some(collect_runtime_flows(flows));
3524    }
3525
3526    if let Ok(flows) = serde_json::from_value::<Vec<Flow>>(value) {
3527        return Some(flows);
3528    }
3529
3530    warn!(
3531        extension = %extension.kind,
3532        version = %extension.version,
3533        "runtime flow extension present but could not be decoded"
3534    );
3535    None
3536}
3537
3538fn collect_runtime_flows(flows: Vec<RuntimeFlow>) -> Vec<Flow> {
3539    flows
3540        .into_iter()
3541        .filter_map(|flow| match runtime_flow_to_flow(flow) {
3542            Ok(flow) => Some(flow),
3543            Err(err) => {
3544                warn!(error = %err, "failed to decode runtime flow");
3545                None
3546            }
3547        })
3548        .collect()
3549}
3550
3551fn runtime_flow_to_flow(runtime: RuntimeFlow) -> Result<Flow> {
3552    let flow_id = FlowId::from_str(&runtime.id)
3553        .with_context(|| format!("invalid flow id `{}`", runtime.id))?;
3554    let mut entrypoints = runtime.entrypoints;
3555    if entrypoints.is_empty()
3556        && let Some(start) = &runtime.start
3557    {
3558        entrypoints.insert("default".into(), Value::String(start.clone()));
3559    }
3560
3561    let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
3562    for (id, node) in runtime.nodes {
3563        let node_id = NodeId::from_str(&id).with_context(|| format!("invalid node id `{id}`"))?;
3564        let component_id = ComponentId::from_str(&node.component_id)
3565            .with_context(|| format!("invalid component id `{}`", node.component_id))?;
3566        let operation_payload = if node.config.is_null() {
3567            node.operation_payload
3568        } else {
3569            serde_json::json!({
3570                "input": node.operation_payload,
3571                "config": node.config,
3572            })
3573        };
3574        let component = FlowComponentRef {
3575            id: component_id,
3576            pack_alias: None,
3577            operation: node.operation_name,
3578        };
3579        let routing = node.routing.unwrap_or(Routing::End);
3580        let telemetry = node.telemetry.unwrap_or_default();
3581        nodes.insert(
3582            node_id.clone(),
3583            Node {
3584                id: node_id,
3585                component,
3586                input: InputMapping {
3587                    mapping: operation_payload,
3588                },
3589                output: OutputMapping {
3590                    mapping: Value::Null,
3591                },
3592                err_map: None,
3593                routing,
3594                telemetry,
3595            },
3596        );
3597    }
3598
3599    Ok(Flow {
3600        schema_version: runtime.schema_version.unwrap_or_else(|| "1.0".to_string()),
3601        id: flow_id,
3602        kind: runtime.kind,
3603        entrypoints,
3604        nodes,
3605        metadata: runtime.metadata.unwrap_or_default(),
3606    })
3607}
3608
3609fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
3610    match kind {
3611        greentic_types::FlowKind::Messaging => "messaging",
3612        greentic_types::FlowKind::Event => "event",
3613        greentic_types::FlowKind::ComponentConfig => "component-config",
3614        greentic_types::FlowKind::Job => "job",
3615        greentic_types::FlowKind::Http => "http",
3616    }
3617}
3618
3619fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
3620    let mut file = archive
3621        .by_name(name)
3622        .with_context(|| format!("entry {name} missing from archive"))?;
3623    let mut buf = Vec::new();
3624    file.read_to_end(&mut buf)?;
3625    Ok(buf)
3626}
3627
3628fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
3629    for node in doc.nodes.values_mut() {
3630        let Some((component_ref, payload)) = node
3631            .raw
3632            .iter()
3633            .next()
3634            .map(|(key, value)| (key.clone(), value.clone()))
3635        else {
3636            continue;
3637        };
3638        if component_ref.starts_with("emit.") {
3639            node.operation = Some(component_ref);
3640            node.payload = payload;
3641            node.raw.clear();
3642            continue;
3643        }
3644        let (target_component, operation, input, config) =
3645            infer_component_exec(&payload, &component_ref);
3646        let mut payload_obj = serde_json::Map::new();
3647        // component.exec is meta; ensure the payload carries the actual target component.
3648        payload_obj.insert("component".into(), Value::String(target_component));
3649        payload_obj.insert("operation".into(), Value::String(operation));
3650        payload_obj.insert("input".into(), input);
3651        if let Some(cfg) = config {
3652            payload_obj.insert("config".into(), cfg);
3653        }
3654        node.operation = Some("component.exec".to_string());
3655        node.payload = Value::Object(payload_obj);
3656        node.raw.clear();
3657    }
3658    doc
3659}
3660
3661fn infer_component_exec(
3662    payload: &Value,
3663    component_ref: &str,
3664) -> (String, String, Value, Option<Value>) {
3665    let default_op = if component_ref.starts_with("templating.") {
3666        "render"
3667    } else {
3668        "invoke"
3669    }
3670    .to_string();
3671
3672    if let Value::Object(map) = payload {
3673        let has_embedded_component =
3674            map.get("component").is_some() || map.get("component_ref").is_some();
3675        let op = map
3676            .get("op")
3677            .or_else(|| map.get("operation"))
3678            .and_then(Value::as_str)
3679            .map(|s| s.to_string())
3680            .unwrap_or_else(|| {
3681                if has_embedded_component {
3682                    component_ref.to_string()
3683                } else {
3684                    default_op.clone()
3685                }
3686            });
3687
3688        let mut input = map.clone();
3689        let config = input.remove("config");
3690        let canonical_input = if has_embedded_component {
3691            input.get("input").cloned()
3692        } else {
3693            None
3694        };
3695        let component = input
3696            .get("component")
3697            .or_else(|| input.get("component_ref"))
3698            .and_then(Value::as_str)
3699            .map(|s| s.to_string())
3700            .unwrap_or_else(|| component_ref.to_string());
3701        input.remove("component");
3702        input.remove("component_ref");
3703        input.remove("op");
3704        input.remove("operation");
3705        let input = canonical_input.unwrap_or(Value::Object(input));
3706        return (component, op, input, config);
3707    }
3708
3709    (component_ref.to_string(), default_op, payload.clone(), None)
3710}
3711
3712#[derive(Clone, Debug)]
3713struct ComponentSpec {
3714    id: String,
3715    version: String,
3716    legacy_path: Option<String>,
3717}
3718
3719#[derive(Clone, Debug)]
3720struct ComponentSourceInfo {
3721    digest: Option<String>,
3722    source: ComponentSourceRef,
3723    artifact: ComponentArtifactLocation,
3724    expected_wasm_sha256: Option<String>,
3725    skip_digest_verification: bool,
3726}
3727
3728#[derive(Clone, Debug)]
3729enum ComponentArtifactLocation {
3730    Inline { wasm_path: String },
3731    Remote,
3732}
3733
3734#[derive(Clone, Debug, Deserialize)]
3735struct PackLockV1 {
3736    schema_version: u32,
3737    components: Vec<PackLockComponent>,
3738}
3739
3740#[derive(Clone, Debug, Deserialize)]
3741struct PackLockComponent {
3742    name: String,
3743    #[serde(default, rename = "source_ref")]
3744    source_ref: Option<String>,
3745    #[serde(default, rename = "ref")]
3746    legacy_ref: Option<String>,
3747    #[serde(default)]
3748    component_id: Option<ComponentId>,
3749    #[serde(default)]
3750    bundled: Option<bool>,
3751    #[serde(default, rename = "bundled_path")]
3752    bundled_path: Option<String>,
3753    #[serde(default, rename = "path")]
3754    legacy_path: Option<String>,
3755    #[serde(default)]
3756    wasm_sha256: Option<String>,
3757    #[serde(default, rename = "sha256")]
3758    legacy_sha256: Option<String>,
3759    #[serde(default)]
3760    resolved_digest: Option<String>,
3761    #[serde(default)]
3762    digest: Option<String>,
3763}
3764
3765fn component_specs(
3766    manifest: Option<&greentic_types::PackManifest>,
3767    legacy_manifest: Option<&legacy_pack::PackManifest>,
3768    component_sources: Option<&ComponentSourcesV1>,
3769    pack_lock: Option<&PackLockV1>,
3770) -> Vec<ComponentSpec> {
3771    if let Some(manifest) = manifest {
3772        if !manifest.components.is_empty() {
3773            return manifest
3774                .components
3775                .iter()
3776                .map(|entry| ComponentSpec {
3777                    id: entry.id.as_str().to_string(),
3778                    version: entry.version.to_string(),
3779                    legacy_path: None,
3780                })
3781                .collect();
3782        }
3783        if let Some(lock) = pack_lock {
3784            let mut seen = HashSet::new();
3785            let mut specs = Vec::new();
3786            for entry in &lock.components {
3787                let id = entry
3788                    .component_id
3789                    .as_ref()
3790                    .map(|id| id.as_str())
3791                    .unwrap_or(entry.name.as_str());
3792                if seen.insert(id.to_string()) {
3793                    specs.push(ComponentSpec {
3794                        id: id.to_string(),
3795                        version: "0.0.0".to_string(),
3796                        legacy_path: None,
3797                    });
3798                }
3799            }
3800            return specs;
3801        }
3802        if let Some(sources) = component_sources {
3803            let mut seen = HashSet::new();
3804            let mut specs = Vec::new();
3805            for entry in &sources.components {
3806                let id = entry
3807                    .component_id
3808                    .as_ref()
3809                    .map(|id| id.as_str())
3810                    .unwrap_or(entry.name.as_str());
3811                if seen.insert(id.to_string()) {
3812                    specs.push(ComponentSpec {
3813                        id: id.to_string(),
3814                        version: "0.0.0".to_string(),
3815                        legacy_path: None,
3816                    });
3817                }
3818            }
3819            return specs;
3820        }
3821    }
3822    if let Some(legacy_manifest) = legacy_manifest {
3823        return legacy_manifest
3824            .components
3825            .iter()
3826            .map(|entry| ComponentSpec {
3827                id: entry.name.clone(),
3828                version: entry.version.to_string(),
3829                legacy_path: Some(entry.file_wasm.clone()),
3830            })
3831            .collect();
3832    }
3833    Vec::new()
3834}
3835
3836fn component_sources_table(
3837    sources: Option<&ComponentSourcesV1>,
3838) -> Result<Option<HashMap<String, ComponentSourceInfo>>> {
3839    let Some(sources) = sources else {
3840        return Ok(None);
3841    };
3842    let mut table = HashMap::new();
3843    for entry in &sources.components {
3844        let artifact = match &entry.artifact {
3845            ArtifactLocationV1::Inline { wasm_path, .. } => ComponentArtifactLocation::Inline {
3846                wasm_path: wasm_path.clone(),
3847            },
3848            ArtifactLocationV1::Remote => ComponentArtifactLocation::Remote,
3849        };
3850        let info = ComponentSourceInfo {
3851            digest: Some(entry.resolved.digest.clone()),
3852            source: entry.source.clone(),
3853            artifact,
3854            expected_wasm_sha256: None,
3855            skip_digest_verification: false,
3856        };
3857        if let Some(component_id) = entry.component_id.as_ref() {
3858            table.insert(component_id.as_str().to_string(), info.clone());
3859        }
3860        table.insert(entry.name.clone(), info);
3861    }
3862    Ok(Some(table))
3863}
3864
3865fn load_pack_lock(path: &Path) -> Result<Option<PackLockV1>> {
3866    let lock_path = if path.is_dir() {
3867        let candidate = path.join("pack.lock");
3868        if candidate.exists() {
3869            Some(candidate)
3870        } else {
3871            let candidate = path.join("pack.lock.json");
3872            candidate.exists().then_some(candidate)
3873        }
3874    } else {
3875        None
3876    };
3877    let Some(lock_path) = lock_path else {
3878        return Ok(None);
3879    };
3880    let raw = std::fs::read_to_string(&lock_path)
3881        .with_context(|| format!("failed to read {}", lock_path.display()))?;
3882    let lock: PackLockV1 = serde_json::from_str(&raw).context("failed to parse pack.lock")?;
3883    if lock.schema_version != 1 {
3884        bail!("pack.lock schema_version must be 1");
3885    }
3886    Ok(Some(lock))
3887}
3888
3889fn find_pack_lock_roots(
3890    pack_path: &Path,
3891    is_dir: bool,
3892    archive_hint: Option<&Path>,
3893) -> Vec<PathBuf> {
3894    if is_dir {
3895        return vec![pack_path.to_path_buf()];
3896    }
3897    let mut roots = Vec::new();
3898    if let Some(archive_path) = archive_hint {
3899        if let Some(parent) = archive_path.parent() {
3900            roots.push(parent.to_path_buf());
3901            if let Some(grandparent) = parent.parent() {
3902                roots.push(grandparent.to_path_buf());
3903            }
3904        }
3905    } else if let Some(parent) = pack_path.parent() {
3906        roots.push(parent.to_path_buf());
3907        if let Some(grandparent) = parent.parent() {
3908            roots.push(grandparent.to_path_buf());
3909        }
3910    }
3911    roots
3912}
3913
3914fn normalize_sha256(digest: &str) -> Result<String> {
3915    let trimmed = digest.trim();
3916    if trimmed.is_empty() {
3917        bail!("sha256 digest cannot be empty");
3918    }
3919    if let Some(stripped) = trimmed.strip_prefix("sha256:") {
3920        if stripped.is_empty() {
3921            bail!("sha256 digest must include hex bytes after sha256:");
3922        }
3923        return Ok(trimmed.to_string());
3924    }
3925    if trimmed.chars().all(|c| c.is_ascii_hexdigit()) {
3926        return Ok(format!("sha256:{trimmed}"));
3927    }
3928    bail!("sha256 digest must be hex or sha256:<hex>");
3929}
3930
3931fn component_sources_table_from_pack_lock(
3932    lock: &PackLockV1,
3933    allow_missing_hash: bool,
3934) -> Result<HashMap<String, ComponentSourceInfo>> {
3935    let mut table = HashMap::new();
3936    let mut names = HashSet::new();
3937    for entry in &lock.components {
3938        if !names.insert(entry.name.clone()) {
3939            bail!(
3940                "pack.lock contains duplicate component name `{}`",
3941                entry.name
3942            );
3943        }
3944        let source_ref = match (&entry.source_ref, &entry.legacy_ref) {
3945            (Some(primary), Some(legacy)) => {
3946                if primary != legacy {
3947                    bail!(
3948                        "pack.lock component {} has conflicting refs: {} vs {}",
3949                        entry.name,
3950                        primary,
3951                        legacy
3952                    );
3953                }
3954                primary.as_str()
3955            }
3956            (Some(primary), None) => primary.as_str(),
3957            (None, Some(legacy)) => legacy.as_str(),
3958            (None, None) => {
3959                bail!("pack.lock component {} missing source_ref", entry.name);
3960            }
3961        };
3962        let source: ComponentSourceRef = source_ref
3963            .parse()
3964            .with_context(|| format!("invalid component ref `{}`", source_ref))?;
3965        let bundled_path = match (&entry.bundled_path, &entry.legacy_path) {
3966            (Some(primary), Some(legacy)) => {
3967                if primary != legacy {
3968                    bail!(
3969                        "pack.lock component {} has conflicting bundled paths: {} vs {}",
3970                        entry.name,
3971                        primary,
3972                        legacy
3973                    );
3974                }
3975                Some(primary.clone())
3976            }
3977            (Some(primary), None) => Some(primary.clone()),
3978            (None, Some(legacy)) => Some(legacy.clone()),
3979            (None, None) => None,
3980        };
3981        let bundled = entry.bundled.unwrap_or(false) || bundled_path.is_some();
3982        let (artifact, digest, expected_wasm_sha256, skip_digest_verification) = if bundled {
3983            let wasm_path = bundled_path.ok_or_else(|| {
3984                anyhow!(
3985                    "pack.lock component {} marked bundled but bundled_path is missing",
3986                    entry.name
3987                )
3988            })?;
3989            let expected_raw = match (&entry.wasm_sha256, &entry.legacy_sha256) {
3990                (Some(primary), Some(legacy)) => {
3991                    if primary != legacy {
3992                        bail!(
3993                            "pack.lock component {} has conflicting wasm_sha256 values: {} vs {}",
3994                            entry.name,
3995                            primary,
3996                            legacy
3997                        );
3998                    }
3999                    Some(primary.as_str())
4000                }
4001                (Some(primary), None) => Some(primary.as_str()),
4002                (None, Some(legacy)) => Some(legacy.as_str()),
4003                (None, None) => None,
4004            };
4005            let expected = match expected_raw {
4006                Some(value) => Some(normalize_sha256(value)?),
4007                None => None,
4008            };
4009            if expected.is_none() && !allow_missing_hash {
4010                bail!(
4011                    "pack.lock component {} missing wasm_sha256 for bundled component",
4012                    entry.name
4013                );
4014            }
4015            (
4016                ComponentArtifactLocation::Inline { wasm_path },
4017                expected.clone(),
4018                expected,
4019                allow_missing_hash && expected_raw.is_none(),
4020            )
4021        } else {
4022            if source.is_tag() {
4023                bail!(
4024                    "component {} uses tag ref {} but is not bundled; rebuild the pack",
4025                    entry.name,
4026                    source
4027                );
4028            }
4029            let expected = entry
4030                .resolved_digest
4031                .as_deref()
4032                .or(entry.digest.as_deref())
4033                .ok_or_else(|| {
4034                    anyhow!(
4035                        "pack.lock component {} missing resolved_digest for remote component",
4036                        entry.name
4037                    )
4038                })?;
4039            (
4040                ComponentArtifactLocation::Remote,
4041                Some(normalize_digest(expected)),
4042                None,
4043                false,
4044            )
4045        };
4046        let info = ComponentSourceInfo {
4047            digest,
4048            source,
4049            artifact,
4050            expected_wasm_sha256,
4051            skip_digest_verification,
4052        };
4053        if let Some(component_id) = entry.component_id.as_ref() {
4054            let key = component_id.as_str().to_string();
4055            if table.contains_key(&key) {
4056                bail!(
4057                    "pack.lock contains duplicate component id `{}`",
4058                    component_id.as_str()
4059                );
4060            }
4061            table.insert(key, info.clone());
4062        }
4063        if entry.name
4064            != entry
4065                .component_id
4066                .as_ref()
4067                .map(|id| id.as_str())
4068                .unwrap_or("")
4069        {
4070            table.insert(entry.name.clone(), info);
4071        }
4072    }
4073    Ok(table)
4074}
4075
4076fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
4077    if let Some(path) = &spec.legacy_path {
4078        return root.join(path);
4079    }
4080    root.join("components").join(format!("{}.wasm", spec.id))
4081}
4082
4083fn normalize_digest(digest: &str) -> String {
4084    if digest.starts_with("sha256:") || digest.starts_with("blake3:") {
4085        digest.to_string()
4086    } else {
4087        format!("sha256:{digest}")
4088    }
4089}
4090
4091fn compute_digest_for(bytes: &[u8], digest: &str) -> Result<String> {
4092    if digest.starts_with("blake3:") {
4093        let hash = blake3::hash(bytes);
4094        return Ok(format!("blake3:{}", hash.to_hex()));
4095    }
4096    let mut hasher = sha2::Sha256::new();
4097    hasher.update(bytes);
4098    Ok(format!("sha256:{}", to_hex(&hasher.finalize())))
4099}
4100
4101fn compute_sha256_digest_for(bytes: &[u8]) -> String {
4102    let mut hasher = sha2::Sha256::new();
4103    hasher.update(bytes);
4104    format!("sha256:{}", to_hex(&hasher.finalize()))
4105}
4106
4107fn build_artifact_key(cache: &CacheManager, digest: Option<&str>, bytes: &[u8]) -> ArtifactKey {
4108    let wasm_digest = digest
4109        .map(normalize_digest)
4110        .unwrap_or_else(|| compute_sha256_digest_for(bytes));
4111    ArtifactKey::new(cache.engine_profile_id().to_string(), wasm_digest)
4112}
4113
4114async fn compile_component_with_cache(
4115    cache: &CacheManager,
4116    engine: &Engine,
4117    digest: Option<&str>,
4118    bytes: Vec<u8>,
4119) -> Result<Arc<Component>> {
4120    let key = build_artifact_key(cache, digest, &bytes);
4121    cache.get_component(engine, &key, || Ok(bytes)).await
4122}
4123
4124fn verify_component_digest(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
4125    let normalized_expected = normalize_digest(expected);
4126    let actual = compute_digest_for(bytes, &normalized_expected)?;
4127    if normalize_digest(&actual) != normalized_expected {
4128        bail!(
4129            "component {component_id} digest mismatch: expected {normalized_expected}, got {actual}"
4130        );
4131    }
4132    Ok(())
4133}
4134
4135fn verify_wasm_sha256(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
4136    let normalized_expected = normalize_sha256(expected)?;
4137    let actual = compute_sha256_digest_for(bytes);
4138    if actual != normalized_expected {
4139        bail!(
4140            "component {component_id} bundled digest mismatch: expected {normalized_expected}, got {actual}"
4141        );
4142    }
4143    Ok(())
4144}
4145
4146fn to_hex(digest: &[u8]) -> String {
4147    digest.iter().map(|byte| format!("{byte:02x}")).collect()
4148}
4149
4150#[cfg(test)]
4151mod pack_lock_tests {
4152    use super::*;
4153    use tempfile::TempDir;
4154
4155    #[test]
4156    fn pack_lock_tag_ref_requires_bundle() {
4157        let lock = PackLockV1 {
4158            schema_version: 1,
4159            components: vec![PackLockComponent {
4160                name: "templates".to_string(),
4161                source_ref: Some("oci://registry.test/templates:latest".to_string()),
4162                legacy_ref: None,
4163                component_id: None,
4164                bundled: Some(false),
4165                bundled_path: None,
4166                legacy_path: None,
4167                wasm_sha256: None,
4168                legacy_sha256: None,
4169                resolved_digest: None,
4170                digest: None,
4171            }],
4172        };
4173        let err = component_sources_table_from_pack_lock(&lock, false).unwrap_err();
4174        assert!(
4175            err.to_string().contains("tag ref") && err.to_string().contains("rebuild the pack"),
4176            "unexpected error: {err}"
4177        );
4178    }
4179
4180    #[test]
4181    fn bundled_hash_mismatch_errors() {
4182        let rt = tokio::runtime::Runtime::new().expect("runtime");
4183        let temp = TempDir::new().expect("temp dir");
4184        let engine = Engine::default();
4185        let engine_profile =
4186            EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
4187        let cache_config = CacheConfig {
4188            root: temp.path().join("cache"),
4189            ..CacheConfig::default()
4190        };
4191        let cache = CacheManager::new(cache_config, engine_profile);
4192        let wasm_path = temp.path().join("component.wasm");
4193        let fixture_wasm = Path::new(env!("CARGO_MANIFEST_DIR"))
4194            .join("../../tests/fixtures/packs/secrets_store_smoke/components/echo_secret.wasm");
4195        let bytes = std::fs::read(&fixture_wasm).expect("read fixture wasm");
4196        std::fs::write(&wasm_path, &bytes).expect("write temp wasm");
4197
4198        let spec = ComponentSpec {
4199            id: "qa.process".to_string(),
4200            version: "0.0.0".to_string(),
4201            legacy_path: None,
4202        };
4203        let mut missing = HashSet::new();
4204        missing.insert(spec.id.clone());
4205
4206        let mut sources = HashMap::new();
4207        sources.insert(
4208            spec.id.clone(),
4209            ComponentSourceInfo {
4210                digest: Some("sha256:deadbeef".to_string()),
4211                source: ComponentSourceRef::Oci("registry.test/qa.process@sha256:deadbeef".into()),
4212                artifact: ComponentArtifactLocation::Inline {
4213                    wasm_path: "component.wasm".to_string(),
4214                },
4215                expected_wasm_sha256: Some("sha256:deadbeef".to_string()),
4216                skip_digest_verification: false,
4217            },
4218        );
4219
4220        let mut loaded = HashMap::new();
4221        let result = rt.block_on(load_components_from_sources(
4222            &cache,
4223            &engine,
4224            &sources,
4225            &ComponentResolution::default(),
4226            &[spec],
4227            &mut missing,
4228            &mut loaded,
4229            Some(temp.path()),
4230            None,
4231        ));
4232        let err = result.unwrap_err();
4233        assert!(
4234            err.to_string().contains("bundled digest mismatch"),
4235            "unexpected error: {err}"
4236        );
4237    }
4238}
4239
4240#[cfg(test)]
4241mod pack_resolution_prop_tests {
4242    use super::*;
4243    use greentic_types::{ArtifactLocationV1, ComponentSourceEntryV1, ResolvedComponentV1};
4244    use proptest::prelude::*;
4245    use proptest::test_runner::{Config as ProptestConfig, RngAlgorithm, TestRng, TestRunner};
4246    use std::collections::BTreeSet;
4247    use std::path::Path;
4248    use std::str::FromStr;
4249
4250    #[derive(Clone, Debug)]
4251    enum ResolveRequest {
4252        ById(String),
4253        ByName(String),
4254    }
4255
4256    #[derive(Clone, Debug, PartialEq, Eq)]
4257    struct ResolvedComponent {
4258        key: String,
4259        source: String,
4260        artifact: String,
4261        digest: Option<String>,
4262        expected_wasm_sha256: Option<String>,
4263        skip_digest_verification: bool,
4264    }
4265
4266    #[derive(Clone, Debug, PartialEq, Eq)]
4267    struct ResolveError {
4268        code: String,
4269        message: String,
4270        context_key: String,
4271    }
4272
4273    #[derive(Clone, Debug)]
4274    struct Scenario {
4275        pack_lock: Option<PackLockV1>,
4276        component_sources: Option<ComponentSourcesV1>,
4277        request: ResolveRequest,
4278        expected_sha256: Option<String>,
4279        bytes: Vec<u8>,
4280    }
4281
4282    fn resolve_component_test(
4283        sources: Option<&ComponentSourcesV1>,
4284        lock: Option<&PackLockV1>,
4285        request: &ResolveRequest,
4286    ) -> Result<ResolvedComponent, ResolveError> {
4287        let table = if let Some(lock) = lock {
4288            component_sources_table_from_pack_lock(lock, false).map_err(|err| ResolveError {
4289                code: classify_pack_lock_error(err.to_string().as_str()).to_string(),
4290                message: err.to_string(),
4291                context_key: request_key(request).to_string(),
4292            })?
4293        } else {
4294            let sources = component_sources_table(sources).map_err(|err| ResolveError {
4295                code: "component_sources_error".to_string(),
4296                message: err.to_string(),
4297                context_key: request_key(request).to_string(),
4298            })?;
4299            sources.ok_or_else(|| ResolveError {
4300                code: "missing_component_sources".to_string(),
4301                message: "component sources not provided".to_string(),
4302                context_key: request_key(request).to_string(),
4303            })?
4304        };
4305
4306        let key = request_key(request);
4307        let source = table.get(key).ok_or_else(|| ResolveError {
4308            code: "component_not_found".to_string(),
4309            message: format!("component {key} not found"),
4310            context_key: key.to_string(),
4311        })?;
4312
4313        Ok(ResolvedComponent {
4314            key: key.to_string(),
4315            source: source.source.to_string(),
4316            artifact: match source.artifact {
4317                ComponentArtifactLocation::Inline { .. } => "inline".to_string(),
4318                ComponentArtifactLocation::Remote => "remote".to_string(),
4319            },
4320            digest: source.digest.clone(),
4321            expected_wasm_sha256: source.expected_wasm_sha256.clone(),
4322            skip_digest_verification: source.skip_digest_verification,
4323        })
4324    }
4325
4326    fn request_key(request: &ResolveRequest) -> &str {
4327        match request {
4328            ResolveRequest::ById(value) => value.as_str(),
4329            ResolveRequest::ByName(value) => value.as_str(),
4330        }
4331    }
4332
4333    fn classify_pack_lock_error(message: &str) -> &'static str {
4334        if message.contains("duplicate component name") {
4335            "duplicate_name"
4336        } else if message.contains("duplicate component id") {
4337            "duplicate_id"
4338        } else if message.contains("conflicting refs") {
4339            "conflicting_ref"
4340        } else if message.contains("conflicting bundled paths") {
4341            "conflicting_bundled_path"
4342        } else if message.contains("conflicting wasm_sha256") {
4343            "conflicting_wasm_sha256"
4344        } else if message.contains("missing source_ref") {
4345            "missing_source_ref"
4346        } else if message.contains("marked bundled but bundled_path is missing") {
4347            "missing_bundled_path"
4348        } else if message.contains("missing wasm_sha256") {
4349            "missing_wasm_sha256"
4350        } else if message.contains("tag ref") && message.contains("not bundled") {
4351            "tag_ref_requires_bundle"
4352        } else if message.contains("missing resolved_digest") {
4353            "missing_resolved_digest"
4354        } else if message.contains("invalid component ref") {
4355            "invalid_component_ref"
4356        } else if message.contains("sha256 digest") {
4357            "invalid_sha256"
4358        } else {
4359            "unknown_error"
4360        }
4361    }
4362
4363    fn known_error_codes() -> BTreeSet<&'static str> {
4364        [
4365            "component_sources_error",
4366            "missing_component_sources",
4367            "component_not_found",
4368            "duplicate_name",
4369            "duplicate_id",
4370            "conflicting_ref",
4371            "conflicting_bundled_path",
4372            "conflicting_wasm_sha256",
4373            "missing_source_ref",
4374            "missing_bundled_path",
4375            "missing_wasm_sha256",
4376            "tag_ref_requires_bundle",
4377            "missing_resolved_digest",
4378            "invalid_component_ref",
4379            "invalid_sha256",
4380            "unknown_error",
4381        ]
4382        .into_iter()
4383        .collect()
4384    }
4385
4386    fn proptest_config() -> ProptestConfig {
4387        let cases = std::env::var("PROPTEST_CASES")
4388            .ok()
4389            .and_then(|value| value.parse::<u32>().ok())
4390            .unwrap_or(128);
4391        ProptestConfig {
4392            cases,
4393            failure_persistence: None,
4394            ..ProptestConfig::default()
4395        }
4396    }
4397
4398    fn proptest_seed() -> Option<[u8; 32]> {
4399        let seed = std::env::var("PROPTEST_SEED")
4400            .ok()
4401            .and_then(|value| value.parse::<u64>().ok())?;
4402        let mut bytes = [0u8; 32];
4403        bytes[..8].copy_from_slice(&seed.to_le_bytes());
4404        Some(bytes)
4405    }
4406
4407    fn run_cases(strategy: impl Strategy<Value = Scenario>, cases: u32, seed: Option<[u8; 32]>) {
4408        let config = ProptestConfig {
4409            cases,
4410            failure_persistence: None,
4411            ..ProptestConfig::default()
4412        };
4413        let mut runner = match seed {
4414            Some(bytes) => {
4415                TestRunner::new_with_rng(config, TestRng::from_seed(RngAlgorithm::ChaCha, &bytes))
4416            }
4417            None => TestRunner::new(config),
4418        };
4419        runner
4420            .run(&strategy, |scenario| {
4421                run_scenario(&scenario);
4422                Ok(())
4423            })
4424            .unwrap();
4425    }
4426
4427    fn run_scenario(scenario: &Scenario) {
4428        let known_codes = known_error_codes();
4429        let first = resolve_component_test(
4430            scenario.component_sources.as_ref(),
4431            scenario.pack_lock.as_ref(),
4432            &scenario.request,
4433        );
4434        let second = resolve_component_test(
4435            scenario.component_sources.as_ref(),
4436            scenario.pack_lock.as_ref(),
4437            &scenario.request,
4438        );
4439        assert_eq!(normalize_result(&first), normalize_result(&second));
4440
4441        if let Some(lock) = scenario.pack_lock.as_ref() {
4442            let lock_only = resolve_component_test(None, Some(lock), &scenario.request);
4443            assert_eq!(normalize_result(&first), normalize_result(&lock_only));
4444        }
4445
4446        if let Err(err) = first.as_ref() {
4447            assert!(
4448                known_codes.contains(err.code.as_str()),
4449                "unexpected error code {}: {}",
4450                err.code,
4451                err.message
4452            );
4453        }
4454
4455        if let Some(expected) = scenario.expected_sha256.as_deref() {
4456            let expected_ok =
4457                verify_wasm_sha256("test.component", expected, &scenario.bytes).is_ok();
4458            let actual = compute_sha256_digest_for(&scenario.bytes);
4459            if actual == normalize_sha256(expected).unwrap_or_default() {
4460                assert!(expected_ok, "expected sha256 match to succeed");
4461            } else {
4462                assert!(!expected_ok, "expected sha256 mismatch to fail");
4463            }
4464        }
4465    }
4466
4467    fn normalize_result(
4468        result: &Result<ResolvedComponent, ResolveError>,
4469    ) -> Result<ResolvedComponent, ResolveError> {
4470        match result {
4471            Ok(value) => Ok(value.clone()),
4472            Err(err) => Err(err.clone()),
4473        }
4474    }
4475
4476    fn scenario_strategy() -> impl Strategy<Value = Scenario> {
4477        let name = any::<u8>().prop_map(|n| format!("component{n}.core"));
4478        let alt_name = any::<u8>().prop_map(|n| format!("component_alt{n}.core"));
4479        let tag_ref = any::<bool>();
4480        let bundled = any::<bool>();
4481        let include_sha = any::<bool>();
4482        let include_component_id = any::<bool>();
4483        let request_by_id = any::<bool>();
4484        let use_lock = any::<bool>();
4485        let use_sources = any::<bool>();
4486        let bytes = prop::collection::vec(any::<u8>(), 1..64);
4487
4488        (
4489            name,
4490            alt_name,
4491            tag_ref,
4492            bundled,
4493            include_sha,
4494            include_component_id,
4495            request_by_id,
4496            use_lock,
4497            use_sources,
4498            bytes,
4499        )
4500            .prop_map(
4501                |(
4502                    name,
4503                    alt_name,
4504                    tag_ref,
4505                    bundled,
4506                    include_sha,
4507                    include_component_id,
4508                    request_by_id,
4509                    use_lock,
4510                    use_sources,
4511                    bytes,
4512                )| {
4513                    let component_id_str = if include_component_id {
4514                        alt_name.clone()
4515                    } else {
4516                        name.clone()
4517                    };
4518                    let component_id = ComponentId::from_str(&component_id_str).ok();
4519                    let source_ref = if tag_ref {
4520                        format!("oci://registry.test/{name}:v1")
4521                    } else {
4522                        format!(
4523                            "oci://registry.test/{name}@sha256:{}",
4524                            hex::encode([0x11u8; 32])
4525                        )
4526                    };
4527                    let expected_sha256 = if bundled && include_sha {
4528                        Some(compute_sha256_digest_for(&bytes))
4529                    } else {
4530                        None
4531                    };
4532
4533                    let lock_component = PackLockComponent {
4534                        name: name.clone(),
4535                        source_ref: Some(source_ref),
4536                        legacy_ref: None,
4537                        component_id,
4538                        bundled: Some(bundled),
4539                        bundled_path: if bundled {
4540                            Some(format!("components/{name}.wasm"))
4541                        } else {
4542                            None
4543                        },
4544                        legacy_path: None,
4545                        wasm_sha256: expected_sha256.clone(),
4546                        legacy_sha256: None,
4547                        resolved_digest: if bundled {
4548                            None
4549                        } else {
4550                            Some("sha256:deadbeef".to_string())
4551                        },
4552                        digest: None,
4553                    };
4554
4555                    let pack_lock = if use_lock {
4556                        Some(PackLockV1 {
4557                            schema_version: 1,
4558                            components: vec![lock_component],
4559                        })
4560                    } else {
4561                        None
4562                    };
4563
4564                    let component_sources = if use_sources {
4565                        Some(ComponentSourcesV1::new(vec![ComponentSourceEntryV1 {
4566                            name: name.clone(),
4567                            component_id: ComponentId::from_str(&name).ok(),
4568                            source: ComponentSourceRef::from_str(
4569                                "oci://registry.test/component@sha256:deadbeef",
4570                            )
4571                            .expect("component ref"),
4572                            resolved: ResolvedComponentV1 {
4573                                digest: "sha256:deadbeef".to_string(),
4574                                signature: None,
4575                                signed_by: None,
4576                            },
4577                            artifact: if bundled {
4578                                ArtifactLocationV1::Inline {
4579                                    wasm_path: format!("components/{name}.wasm"),
4580                                    manifest_path: None,
4581                                }
4582                            } else {
4583                                ArtifactLocationV1::Remote
4584                            },
4585                            licensing_hint: None,
4586                            metering_hint: None,
4587                        }]))
4588                    } else {
4589                        None
4590                    };
4591
4592                    let request = if request_by_id {
4593                        ResolveRequest::ById(component_id_str.clone())
4594                    } else {
4595                        ResolveRequest::ByName(name.clone())
4596                    };
4597
4598                    Scenario {
4599                        pack_lock,
4600                        component_sources,
4601                        request,
4602                        expected_sha256,
4603                        bytes,
4604                    }
4605                },
4606            )
4607    }
4608
4609    #[test]
4610    fn pack_resolution_proptest() {
4611        let seed = proptest_seed();
4612        run_cases(scenario_strategy(), proptest_config().cases, seed);
4613    }
4614
4615    #[test]
4616    fn pack_resolution_regression_seeds() {
4617        let seeds_path =
4618            Path::new(env!("CARGO_MANIFEST_DIR")).join("../../tests/fixtures/proptest-seeds.txt");
4619        let raw = std::fs::read_to_string(&seeds_path).expect("read proptest seeds");
4620        for line in raw.lines() {
4621            let line = line.trim();
4622            if line.is_empty() || line.starts_with('#') {
4623                continue;
4624            }
4625            let seed = line.parse::<u64>().expect("seed must be an integer");
4626            let mut bytes = [0u8; 32];
4627            bytes[..8].copy_from_slice(&seed.to_le_bytes());
4628            run_cases(scenario_strategy(), 1, Some(bytes));
4629        }
4630    }
4631}
4632
4633fn locate_pack_assets(
4634    materialized_root: Option<&Path>,
4635    archive_hint: Option<&Path>,
4636) -> Result<(Option<PathBuf>, Option<TempDir>)> {
4637    if let Some(root) = materialized_root {
4638        let assets = root.join("assets");
4639        if assets.is_dir() {
4640            return Ok((Some(assets), None));
4641        }
4642    }
4643    if let Some(path) = archive_hint
4644        && let Some((tempdir, assets)) = extract_assets_from_archive(path)?
4645    {
4646        return Ok((Some(assets), Some(tempdir)));
4647    }
4648    Ok((None, None))
4649}
4650
4651fn extract_assets_from_archive(path: &Path) -> Result<Option<(TempDir, PathBuf)>> {
4652    let file =
4653        File::open(path).with_context(|| format!("failed to open pack {}", path.display()))?;
4654    let mut archive =
4655        ZipArchive::new(file).with_context(|| format!("failed to read pack {}", path.display()))?;
4656    let temp = TempDir::new().context("failed to create temporary assets directory")?;
4657    let mut found = false;
4658    for idx in 0..archive.len() {
4659        let mut entry = archive.by_index(idx)?;
4660        let name = entry.name();
4661        if !name.starts_with("assets/") {
4662            continue;
4663        }
4664        let dest = temp.path().join(name);
4665        if name.ends_with('/') {
4666            std::fs::create_dir_all(&dest)?;
4667            found = true;
4668            continue;
4669        }
4670        if let Some(parent) = dest.parent() {
4671            std::fs::create_dir_all(parent)?;
4672        }
4673        let mut outfile = std::fs::File::create(&dest)?;
4674        std::io::copy(&mut entry, &mut outfile)?;
4675        found = true;
4676    }
4677    if found {
4678        let assets_path = temp.path().join("assets");
4679        Ok(Some((temp, assets_path)))
4680    } else {
4681        Ok(None)
4682    }
4683}
4684
4685fn dist_options_from(component_resolution: &ComponentResolution) -> DistOptions {
4686    let mut opts = DistOptions {
4687        allow_tags: true,
4688        ..DistOptions::default()
4689    };
4690    if let Some(cache_dir) = component_resolution.dist_cache_dir.clone() {
4691        opts.cache_dir = cache_dir;
4692    }
4693    if component_resolution.dist_offline {
4694        opts.offline = true;
4695    }
4696    opts
4697}
4698
4699#[allow(clippy::too_many_arguments)]
4700async fn load_components_from_sources(
4701    cache: &CacheManager,
4702    engine: &Engine,
4703    component_sources: &HashMap<String, ComponentSourceInfo>,
4704    component_resolution: &ComponentResolution,
4705    specs: &[ComponentSpec],
4706    missing: &mut HashSet<String>,
4707    into: &mut HashMap<String, PackComponent>,
4708    materialized_root: Option<&Path>,
4709    archive_hint: Option<&Path>,
4710) -> Result<()> {
4711    let mut archive = if let Some(path) = archive_hint {
4712        Some(
4713            ZipArchive::new(File::open(path)?)
4714                .with_context(|| format!("{} is not a valid gtpack", path.display()))?,
4715        )
4716    } else {
4717        None
4718    };
4719    let mut dist_client: Option<DistClient> = None;
4720
4721    for spec in specs {
4722        if !missing.contains(&spec.id) {
4723            continue;
4724        }
4725        let Some(source) = component_sources.get(&spec.id) else {
4726            continue;
4727        };
4728
4729        let bytes = match &source.artifact {
4730            ComponentArtifactLocation::Inline { wasm_path } => {
4731                if let Some(root) = materialized_root {
4732                    let path = root.join(wasm_path);
4733                    if path.exists() {
4734                        std::fs::read(&path).with_context(|| {
4735                            format!(
4736                                "failed to read inline component {} from {}",
4737                                spec.id,
4738                                path.display()
4739                            )
4740                        })?
4741                    } else if archive.is_none() {
4742                        bail!("inline component {} missing at {}", spec.id, path.display());
4743                    } else {
4744                        read_entry(
4745                            archive.as_mut().expect("archive present when needed"),
4746                            wasm_path,
4747                        )
4748                        .with_context(|| {
4749                            format!(
4750                                "inline component {} missing at {} in pack archive",
4751                                spec.id, wasm_path
4752                            )
4753                        })?
4754                    }
4755                } else if let Some(archive) = archive.as_mut() {
4756                    read_entry(archive, wasm_path).with_context(|| {
4757                        format!(
4758                            "inline component {} missing at {} in pack archive",
4759                            spec.id, wasm_path
4760                        )
4761                    })?
4762                } else {
4763                    bail!(
4764                        "inline component {} missing and no pack source available",
4765                        spec.id
4766                    );
4767                }
4768            }
4769            ComponentArtifactLocation::Remote => {
4770                if source.source.is_tag() {
4771                    bail!(
4772                        "component {} uses tag ref {} but is not bundled; rebuild the pack",
4773                        spec.id,
4774                        source.source
4775                    );
4776                }
4777                let client = dist_client.get_or_insert_with(|| {
4778                    DistClient::new(dist_options_from(component_resolution))
4779                });
4780                let reference = source.source.to_string();
4781                fault::maybe_fail_asset(&reference)
4782                    .await
4783                    .with_context(|| format!("fault injection blocked asset {reference}"))?;
4784                let digest = source.digest.as_deref().ok_or_else(|| {
4785                    anyhow!(
4786                        "component {} missing expected digest for remote component",
4787                        spec.id
4788                    )
4789                })?;
4790                let cache_path = if let Ok(cache_path) = client.fetch_digest(digest).await {
4791                    cache_path
4792                } else if component_resolution.dist_offline {
4793                    client
4794                        .fetch_digest(digest)
4795                        .await
4796                        .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?
4797                } else {
4798                    let source = client
4799                        .parse_source(&reference)
4800                        .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
4801                    let descriptor = client
4802                        .resolve(source, ResolvePolicy)
4803                        .await
4804                        .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
4805                    let resolved = client
4806                        .fetch(&descriptor, CachePolicy)
4807                        .await
4808                        .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
4809                    let expected = normalize_digest(digest);
4810                    let actual = normalize_digest(&resolved.digest);
4811                    if expected != actual {
4812                        bail!(
4813                            "component {} digest mismatch after fetch: expected {}, got {}",
4814                            spec.id,
4815                            expected,
4816                            actual
4817                        );
4818                    }
4819                    resolved.cache_path.ok_or_else(|| {
4820                        anyhow!(
4821                            "component {} resolved from {} but cache path is missing",
4822                            spec.id,
4823                            reference
4824                        )
4825                    })?
4826                };
4827                std::fs::read(&cache_path).with_context(|| {
4828                    format!(
4829                        "failed to read cached component {} from {}",
4830                        spec.id,
4831                        cache_path.display()
4832                    )
4833                })?
4834            }
4835        };
4836
4837        if let Some(expected) = source.expected_wasm_sha256.as_deref() {
4838            verify_wasm_sha256(&spec.id, expected, &bytes)?;
4839        } else if source.skip_digest_verification {
4840            let actual = compute_sha256_digest_for(&bytes);
4841            warn!(
4842                component_id = %spec.id,
4843                digest = %actual,
4844                "bundled component missing wasm_sha256; allowing due to flag"
4845            );
4846        } else {
4847            let expected = source.digest.as_deref().ok_or_else(|| {
4848                anyhow!(
4849                    "component {} missing expected digest for verification",
4850                    spec.id
4851                )
4852            })?;
4853            verify_component_digest(&spec.id, expected, &bytes)?;
4854        }
4855        let component =
4856            compile_component_with_cache(cache, engine, source.digest.as_deref(), bytes)
4857                .await
4858                .with_context(|| format!("failed to compile component {}", spec.id))?;
4859        into.insert(
4860            spec.id.clone(),
4861            PackComponent {
4862                name: spec.id.clone(),
4863                version: spec.version.clone(),
4864                component,
4865            },
4866        );
4867        missing.remove(&spec.id);
4868    }
4869
4870    Ok(())
4871}
4872
4873fn dist_error_for_component(err: DistError, component_id: &str, reference: &str) -> anyhow::Error {
4874    match err {
4875        DistError::NotFound { reference: missing } => anyhow!(
4876            "remote component {} is not cached for {}. Run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
4877            component_id,
4878            missing,
4879            reference
4880        ),
4881        DistError::Offline { reference: blocked } => anyhow!(
4882            "offline mode blocked fetching component {} from {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
4883            component_id,
4884            blocked,
4885            reference
4886        ),
4887        DistError::Unauthorized { target } => anyhow!(
4888            "component {} requires authenticated source {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
4889            component_id,
4890            target,
4891            reference
4892        ),
4893        other => anyhow!(
4894            "failed to resolve component {} from {}: {}",
4895            component_id,
4896            reference,
4897            other
4898        ),
4899    }
4900}
4901
4902async fn load_components_from_overrides(
4903    cache: &CacheManager,
4904    engine: &Engine,
4905    overrides: &HashMap<String, PathBuf>,
4906    specs: &[ComponentSpec],
4907    missing: &mut HashSet<String>,
4908    into: &mut HashMap<String, PackComponent>,
4909) -> Result<()> {
4910    for spec in specs {
4911        if !missing.contains(&spec.id) {
4912            continue;
4913        }
4914        let Some(path) = overrides.get(&spec.id) else {
4915            continue;
4916        };
4917        let bytes = std::fs::read(path)
4918            .with_context(|| format!("failed to read override component {}", path.display()))?;
4919        let component = compile_component_with_cache(cache, engine, None, bytes)
4920            .await
4921            .with_context(|| {
4922                format!(
4923                    "failed to compile component {} from override {}",
4924                    spec.id,
4925                    path.display()
4926                )
4927            })?;
4928        into.insert(
4929            spec.id.clone(),
4930            PackComponent {
4931                name: spec.id.clone(),
4932                version: spec.version.clone(),
4933                component,
4934            },
4935        );
4936        missing.remove(&spec.id);
4937    }
4938    Ok(())
4939}
4940
4941async fn load_components_from_dir(
4942    cache: &CacheManager,
4943    engine: &Engine,
4944    root: &Path,
4945    specs: &[ComponentSpec],
4946    missing: &mut HashSet<String>,
4947    into: &mut HashMap<String, PackComponent>,
4948) -> Result<()> {
4949    for spec in specs {
4950        if !missing.contains(&spec.id) {
4951            continue;
4952        }
4953        let path = component_path_for_spec(root, spec);
4954        if !path.exists() {
4955            tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
4956            continue;
4957        }
4958        let bytes = std::fs::read(&path)
4959            .with_context(|| format!("failed to read component {}", path.display()))?;
4960        let component = compile_component_with_cache(cache, engine, None, bytes)
4961            .await
4962            .with_context(|| {
4963                format!(
4964                    "failed to compile component {} from {}",
4965                    spec.id,
4966                    path.display()
4967                )
4968            })?;
4969        into.insert(
4970            spec.id.clone(),
4971            PackComponent {
4972                name: spec.id.clone(),
4973                version: spec.version.clone(),
4974                component,
4975            },
4976        );
4977        missing.remove(&spec.id);
4978    }
4979    Ok(())
4980}
4981
4982async fn load_components_from_archive(
4983    cache: &CacheManager,
4984    engine: &Engine,
4985    path: &Path,
4986    specs: &[ComponentSpec],
4987    missing: &mut HashSet<String>,
4988    into: &mut HashMap<String, PackComponent>,
4989) -> Result<()> {
4990    let mut archive = ZipArchive::new(File::open(path)?)
4991        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
4992    for spec in specs {
4993        if !missing.contains(&spec.id) {
4994            continue;
4995        }
4996        let file_name = spec
4997            .legacy_path
4998            .clone()
4999            .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
5000        let bytes = match read_entry(&mut archive, &file_name) {
5001            Ok(bytes) => bytes,
5002            Err(err) => {
5003                warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
5004                continue;
5005            }
5006        };
5007        let component = compile_component_with_cache(cache, engine, None, bytes)
5008            .await
5009            .with_context(|| format!("failed to compile component {}", spec.id))?;
5010        into.insert(
5011            spec.id.clone(),
5012            PackComponent {
5013                name: spec.id.clone(),
5014                version: spec.version.clone(),
5015                component,
5016            },
5017        );
5018        missing.remove(&spec.id);
5019    }
5020    Ok(())
5021}
5022
5023#[cfg(test)]
5024mod tests {
5025    use super::*;
5026    use greentic_flow::model::{FlowDoc, NodeDoc};
5027    use indexmap::IndexMap;
5028    use serde_json::json;
5029
5030    #[test]
5031    fn normalizes_raw_component_to_component_exec() {
5032        let mut nodes = IndexMap::new();
5033        let mut raw = IndexMap::new();
5034        raw.insert(
5035            "templating.handlebars".into(),
5036            json!({ "template": "Hi {{name}}" }),
5037        );
5038        nodes.insert(
5039            "start".into(),
5040            NodeDoc {
5041                raw,
5042                routing: json!([{"out": true}]),
5043                ..Default::default()
5044            },
5045        );
5046        let doc = FlowDoc {
5047            id: "welcome".into(),
5048            title: None,
5049            description: None,
5050            flow_type: "messaging".into(),
5051            start: Some("start".into()),
5052            parameters: json!({}),
5053            tags: Vec::new(),
5054            schema_version: None,
5055            entrypoints: IndexMap::new(),
5056            meta: None,
5057            slot_schema: None,
5058            nodes,
5059        };
5060
5061        let normalized = normalize_flow_doc(doc);
5062        let node = normalized.nodes.get("start").expect("node exists");
5063        assert_eq!(node.operation.as_deref(), Some("component.exec"));
5064        assert!(node.raw.is_empty());
5065        let payload = node.payload.as_object().expect("payload object");
5066        assert_eq!(
5067            payload.get("component"),
5068            Some(&Value::String("templating.handlebars".into()))
5069        );
5070        assert_eq!(
5071            payload.get("operation"),
5072            Some(&Value::String("render".into()))
5073        );
5074        let input = payload.get("input").unwrap();
5075        assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
5076    }
5077
5078    #[test]
5079    fn normalizes_canonical_operation_node_to_component_exec_with_config() {
5080        let mut nodes = IndexMap::new();
5081        let mut raw = IndexMap::new();
5082        raw.insert(
5083            "handle_message".into(),
5084            json!({
5085                "component": "oci://ghcr.io/greenticai/component/component-llm-openai:stable",
5086                "config": {
5087                    "provider": "ollama",
5088                    "base_url": "http://127.0.0.1:11434/v1",
5089                    "default_model": "llama3.2"
5090                },
5091                "input": {
5092                    "messages": [{
5093                        "role": "user",
5094                        "content": "Say hello from Ollama."
5095                    }]
5096                }
5097            }),
5098        );
5099        nodes.insert(
5100            "llm".into(),
5101            NodeDoc {
5102                raw,
5103                routing: json!([{"out": true}]),
5104                ..Default::default()
5105            },
5106        );
5107        let doc = FlowDoc {
5108            id: "ollama-repro".into(),
5109            title: None,
5110            description: None,
5111            flow_type: "messaging".into(),
5112            start: Some("llm".into()),
5113            parameters: json!({}),
5114            tags: Vec::new(),
5115            schema_version: None,
5116            entrypoints: IndexMap::new(),
5117            meta: None,
5118            slot_schema: None,
5119            nodes,
5120        };
5121
5122        let normalized = normalize_flow_doc(doc);
5123        let node = normalized.nodes.get("llm").expect("node exists");
5124        assert_eq!(node.operation.as_deref(), Some("component.exec"));
5125        assert!(node.raw.is_empty());
5126        let payload = node.payload.as_object().expect("payload object");
5127        assert_eq!(
5128            payload.get("component"),
5129            Some(&Value::String(
5130                "oci://ghcr.io/greenticai/component/component-llm-openai:stable".into()
5131            ))
5132        );
5133        assert_eq!(
5134            payload.get("operation"),
5135            Some(&Value::String("handle_message".into()))
5136        );
5137        assert_eq!(
5138            payload.get("config"),
5139            Some(&json!({
5140                "provider": "ollama",
5141                "base_url": "http://127.0.0.1:11434/v1",
5142                "default_model": "llama3.2"
5143            }))
5144        );
5145        assert_eq!(
5146            payload.get("input"),
5147            Some(&json!({
5148                "messages": [{
5149                    "role": "user",
5150                    "content": "Say hello from Ollama."
5151                }]
5152            }))
5153        );
5154    }
5155
5156    #[test]
5157    fn missing_export_error_detection_recognises_bindgen_shapes() {
5158        // Positive: identity-world missing-instance error
5159        assert!(is_missing_export_error(
5160            "instantiation: no exported instance named \
5161             `greentic:provider-instance-identity/instance-identity-api@0.1.0`"
5162        ));
5163        // Positive: identity-world missing-function error
5164        assert!(is_missing_export_error(
5165            "instantiation: no exported function named `identify-instance`"
5166        ));
5167        // Negative: unrelated trap
5168        assert!(!is_missing_export_error(
5169            "Wasm trap: out of bounds memory access"
5170        ));
5171        // Negative: a DIFFERENT world's missing export must NOT match —
5172        // e.g. schema-core missing is a hard error, not "unsupported"
5173        assert!(!is_missing_export_error(
5174            "instantiation: no exported instance named \
5175             `greentic:provider-schema-core/schema-core-api@1.0.0`"
5176        ));
5177        // Negative: broad marker present but for a non-identity function
5178        assert!(!is_missing_export_error(
5179            "instantiation: no exported function named `invoke`"
5180        ));
5181    }
5182
5183    #[test]
5184    fn identify_outcome_merge_in_follows_lattice() {
5185        let unsupported = || IdentifyOutcome::Unsupported;
5186        let no_match = || IdentifyOutcome::NoMatch;
5187        let id_a = || IdentifyOutcome::Identified("a".to_string());
5188        let id_b = || IdentifyOutcome::Identified("b".to_string());
5189
5190        // Unsupported is the floor — every other variant promotes it.
5191        let mut x = unsupported();
5192        x.merge_in(unsupported());
5193        assert_eq!(x, unsupported());
5194        let mut x = unsupported();
5195        x.merge_in(no_match());
5196        assert_eq!(x, no_match());
5197        let mut x = unsupported();
5198        x.merge_in(id_a());
5199        assert_eq!(x, id_a());
5200
5201        // NoMatch beats Unsupported but is overridable by Identified.
5202        let mut x = no_match();
5203        x.merge_in(unsupported());
5204        assert_eq!(x, no_match(), "NoMatch must not downgrade to Unsupported");
5205        let mut x = no_match();
5206        x.merge_in(no_match());
5207        assert_eq!(x, no_match());
5208        let mut x = no_match();
5209        x.merge_in(id_a());
5210        assert_eq!(x, id_a(), "Identified must override NoMatch");
5211
5212        // Identified is the top — nothing overwrites it (first id wins).
5213        let mut x = id_a();
5214        x.merge_in(unsupported());
5215        assert_eq!(x, id_a());
5216        let mut x = id_a();
5217        x.merge_in(no_match());
5218        assert_eq!(x, id_a());
5219        let mut x = id_a();
5220        x.merge_in(id_b());
5221        assert_eq!(
5222            x,
5223            id_a(),
5224            "first Identified wins; later id does not replace"
5225        );
5226    }
5227}
5228
5229#[cfg(test)]
5230mod identify_endpoints_pack_tests {
5231    use super::*;
5232    use crate::config::{
5233        FlowRetryConfig, HostConfig, OperatorPolicy, RateLimits, SecretsPolicy, StateStorePolicy,
5234        WebhookPolicy,
5235    };
5236    use crate::trace::TraceConfig;
5237    use crate::validate::ValidationConfig;
5238
5239    fn test_host_config() -> HostConfig {
5240        HostConfig {
5241            tenant: "test".to_string(),
5242            bindings_path: PathBuf::from("/tmp/bindings.yaml"),
5243            flow_type_bindings: HashMap::new(),
5244            rate_limits: RateLimits::default(),
5245            retry: FlowRetryConfig::default(),
5246            http_enabled: false,
5247            secrets_policy: SecretsPolicy::allow_all(),
5248            state_store_policy: StateStorePolicy::default(),
5249            webhook_policy: WebhookPolicy::default(),
5250            timers: Vec::new(),
5251            oauth: None,
5252            mocks: None,
5253            pack_bindings: Vec::new(),
5254            env_passthrough: Vec::new(),
5255            trace: TraceConfig::from_env(),
5256            validation: ValidationConfig::from_env(),
5257            operator_policy: OperatorPolicy::allow_all(),
5258            fast2flow: Default::default(),
5259        }
5260    }
5261
5262    #[tokio::test]
5263    async fn no_manifest_returns_unsupported_for_all_types() {
5264        // A PackRuntime with manifest: None (e.g. legacy single-component
5265        // packs or the for_component_test constructor) has no provider
5266        // registry. Every requested type must map to Unsupported — NOT
5267        // NoMatch — so the caller knows it can fall back to the static
5268        // provider_id rather than failing closed.
5269        let pack = PackRuntime::for_component_test(
5270            Vec::new(),
5271            HashMap::new(),
5272            "test-pack",
5273            Arc::new(test_host_config()),
5274        )
5275        .expect("empty pack construction");
5276        let result = pack
5277            .identify_endpoints_by_provider_type(&["teams", "slack", "telegram"], b"{}")
5278            .await
5279            .expect("no-manifest path must succeed");
5280        assert_eq!(result.len(), 3);
5281        for ty in &["teams", "slack", "telegram"] {
5282            assert_eq!(
5283                result.get(*ty),
5284                Some(&IdentifyOutcome::Unsupported),
5285                "type '{ty}' must be Unsupported when pack has no manifest"
5286            );
5287        }
5288    }
5289
5290    #[tokio::test]
5291    async fn empty_provider_types_returns_empty_map() {
5292        let pack = PackRuntime::for_component_test(
5293            Vec::new(),
5294            HashMap::new(),
5295            "test-pack",
5296            Arc::new(test_host_config()),
5297        )
5298        .expect("empty pack construction");
5299        let result = pack
5300            .identify_endpoints_by_provider_type(&[], b"{}")
5301            .await
5302            .expect("empty types fast path");
5303        assert!(result.is_empty());
5304    }
5305}
5306
5307#[derive(Clone, Debug, Default, Serialize, Deserialize)]
5308pub struct PackMetadata {
5309    pub pack_id: String,
5310    pub version: String,
5311    #[serde(default)]
5312    pub entry_flows: Vec<String>,
5313    #[serde(default)]
5314    pub secret_requirements: Vec<greentic_types::SecretRequirement>,
5315}
5316
5317impl PackMetadata {
5318    fn from_wasm(bytes: &[u8]) -> Option<Self> {
5319        let parser = Parser::new(0);
5320        for payload in parser.parse_all(bytes) {
5321            let payload = payload.ok()?;
5322            match payload {
5323                Payload::CustomSection(section) => {
5324                    if section.name() == "greentic.manifest"
5325                        && let Ok(meta) = Self::from_bytes(section.data())
5326                    {
5327                        return Some(meta);
5328                    }
5329                }
5330                Payload::DataSection(reader) => {
5331                    for segment in reader.into_iter().flatten() {
5332                        if let Ok(meta) = Self::from_bytes(segment.data) {
5333                            return Some(meta);
5334                        }
5335                    }
5336                }
5337                _ => {}
5338            }
5339        }
5340        None
5341    }
5342
5343    fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
5344        #[derive(Deserialize)]
5345        struct RawManifest {
5346            pack_id: String,
5347            version: String,
5348            #[serde(default)]
5349            entry_flows: Vec<String>,
5350            #[serde(default)]
5351            flows: Vec<RawFlow>,
5352            #[serde(default)]
5353            secret_requirements: Vec<greentic_types::SecretRequirement>,
5354        }
5355
5356        #[derive(Deserialize)]
5357        struct RawFlow {
5358            id: String,
5359        }
5360
5361        let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
5362        let mut entry_flows = if manifest.entry_flows.is_empty() {
5363            manifest.flows.iter().map(|f| f.id.clone()).collect()
5364        } else {
5365            manifest.entry_flows.clone()
5366        };
5367        entry_flows.retain(|id| !id.is_empty());
5368        Ok(Self {
5369            pack_id: manifest.pack_id,
5370            version: manifest.version,
5371            entry_flows,
5372            secret_requirements: manifest.secret_requirements,
5373        })
5374    }
5375
5376    pub fn fallback(path: &Path) -> Self {
5377        let pack_id = path
5378            .file_stem()
5379            .map(|s| s.to_string_lossy().into_owned())
5380            .unwrap_or_else(|| "unknown-pack".to_string());
5381        Self {
5382            pack_id,
5383            version: "0.0.0".to_string(),
5384            entry_flows: Vec::new(),
5385            secret_requirements: Vec::new(),
5386        }
5387    }
5388
5389    pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
5390        let entry_flows = manifest
5391            .flows
5392            .iter()
5393            .map(|flow| flow.id.as_str().to_string())
5394            .collect::<Vec<_>>();
5395        Self {
5396            pack_id: manifest.pack_id.as_str().to_string(),
5397            version: manifest.version.to_string(),
5398            entry_flows,
5399            secret_requirements: manifest.secret_requirements.clone(),
5400        }
5401    }
5402}