Skip to main content

greentic_runner_host/
pack.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::cache::{ArtifactKey, CacheConfig, CacheManager, CpuPolicy, EngineProfile};
10use crate::component_api::{
11    self, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::identify_hint::IdentifyInstanceHint;
14use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
15use crate::provider::{ProviderBinding, ProviderRegistry};
16use crate::provider_core::{
17    schema_core::SchemaCorePre as LegacySchemaCorePre,
18    schema_core_path::SchemaCorePre as PathSchemaCorePre,
19    schema_core_schema::SchemaCorePre as SchemaSchemaCorePre,
20};
21use crate::provider_core_only;
22use crate::runtime_refs::RuntimeRefsInjection;
23use crate::runtime_wasmtime::{Component, Engine, InstancePre, Linker, ResourceTable};
24use anyhow::{Context, Result, anyhow, bail};
25use futures::executor::block_on;
26use greentic_distributor_client::dist::{
27    CachePolicy, DistClient, DistError, DistOptions, ResolvePolicy,
28};
29use greentic_interfaces_wasmtime::host_helpers::v1::{
30    self as host_v1, HostFns, add_all_v1_to_linker,
31    runner_host_http::RunnerHostHttp,
32    runner_host_kv::RunnerHostKv,
33    runtime_config::{ConfigError, RuntimeConfigHost},
34    secrets_store::{SecretsError, SecretsErrorV1_1, SecretsStoreHost, SecretsStoreHostV1_1},
35    state_store::{
36        OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
37        StateStoreHost, TenantCtx as StateTenantCtx,
38    },
39    telemetry_logger::{
40        OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
41        TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
42        TenantCtx as TelemetryTenantCtx,
43    },
44};
45use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::http::http_client as http_client_client_alias;
46use greentic_interfaces_wasmtime::instance_identity_instance_identity_describe_v0_1::InstanceIdentityDescribePre;
47use greentic_interfaces_wasmtime::instance_identity_v0_1::InstanceIdentityPre;
48use greentic_interfaces_wasmtime::{
49    http_client_client_v1_0::greentic::interfaces_types::types as http_types_v1_0,
50    http_client_client_v1_1::greentic::interfaces_types::types as http_types_v1_1,
51};
52use greentic_pack::builder as legacy_pack;
53use greentic_types::flow::FlowHasher;
54use greentic_types::{
55    ArtifactLocationV1, ComponentId, ComponentManifest, ComponentSourceRef, ComponentSourcesV1,
56    EXT_COMPONENT_SOURCES_V1, EnvId, ExtensionRef, Flow, FlowComponentRef, FlowId, FlowKind,
57    FlowMetadata, InputMapping, Node, NodeId, OutputMapping, Routing, StateKey as StoreStateKey,
58    TeamId, TelemetryHints, TenantCtx as TypesTenantCtx, TenantId, UserId, decode_pack_manifest,
59    pack_manifest::ExtensionInline,
60};
61use host_v1::http_client as host_http_client;
62use host_v1::http_client::{
63    HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
64    Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
65    RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
66    TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
67};
68use indexmap::IndexMap;
69use once_cell::sync::Lazy;
70use parking_lot::{Mutex, RwLock};
71use reqwest::blocking::Client as BlockingClient;
72use runner_core::normalize_under_root;
73use serde::{Deserialize, Serialize};
74use serde_cbor;
75use serde_json::{self, Value};
76use sha2::Digest;
77use tempfile::TempDir;
78use tokio::fs;
79use wasmparser::{Parser, Payload};
80use wasmtime::{Store, StoreContextMut};
81use wasmtime_wasi_http::WasiHttpCtx;
82use wasmtime_wasi_http::p2::{
83    WasiHttpCtxView, WasiHttpView, add_only_http_to_linker_sync as add_wasi_http_to_linker,
84};
85use wasmtime_wasi_tls::p2::LinkOptions;
86use wasmtime_wasi_tls::{WasiTlsCtx, WasiTlsCtxBuilder, WasiTlsCtxView, WasiTlsView};
87use zip::ZipArchive;
88
89use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
90use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
91use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
92#[cfg(feature = "fault-injection")]
93use crate::testing::fault_injection::{FaultContext, FaultPoint, maybe_fail};
94
95use crate::config::HostConfig;
96use crate::fault;
97use crate::secrets::{
98    DynSecretsManager, canonicalize_secret_key, read_secret_blocking, write_secret_blocking,
99};
100use crate::storage::state::STATE_PREFIX;
101use crate::storage::{DynSessionStore, DynStateStore};
102use crate::verify;
103use crate::wasi::{PreopenSpec, RunnerWasiPolicy};
104use tracing::warn;
105use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
106use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
107
108use greentic_flow::model::FlowDoc;
109
110#[allow(dead_code)]
111pub struct PackRuntime {
112    /// Component artifact path (wasm file).
113    path: PathBuf,
114    /// Optional archive (.gtpack) used to load flows/manifests.
115    archive_path: Option<PathBuf>,
116    config: Arc<HostConfig>,
117    engine: Engine,
118    metadata: PackMetadata,
119    manifest: Option<greentic_types::PackManifest>,
120    legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
121    component_manifests: HashMap<String, ComponentManifest>,
122    mocks: Option<Arc<MockLayer>>,
123    flows: Option<PackFlows>,
124    components: HashMap<String, PackComponent>,
125    http_client: Arc<BlockingClient>,
126    pre_cache: Mutex<HashMap<String, InstancePre<ComponentState>>>,
127    session_store: Option<DynSessionStore>,
128    state_store: Option<DynStateStore>,
129    wasi_policy: Arc<RunnerWasiPolicy>,
130    assets_tempdir: Option<TempDir>,
131    provider_registry: RwLock<Option<ProviderRegistry>>,
132    /// Per-revision lazy cache of `describe-identify-instance` results,
133    /// keyed by `component_ref`. `None` value means the component does not
134    /// export the describe world (or the hint was malformed) — the
135    /// caller falls back to passing input headers through unchanged. The
136    /// outer `Option` distinguishes "not probed yet" from "probed and
137    /// has no hint". `ArcSwap`-driven revision swaps allocate a fresh
138    /// `PackRuntime` so this cache is naturally invalidated.
139    identify_hint_cache: RwLock<HashMap<String, Option<IdentifyInstanceHint>>>,
140    secrets: DynSecretsManager,
141    oauth_config: Option<OAuthBrokerConfig>,
142    cache: CacheManager,
143    /// `pack-config.v1.non_secret` map plumbed into each `HostState` for the
144    /// `greentic:runtime-config@1.0.0` host import. Defaults to `None` when no
145    /// producer (greentic-start) has materialized a `PackConfig` yet; in that
146    /// case all runtime-config lookups fall through to the secrets-store
147    /// compat shim.
148    runtime_config_non_secret: Option<Arc<BTreeMap<String, Value>>>,
149    /// `pack-config.v1.runtime_refs` (C5): per-pack `key → URI` bindings plus
150    /// the env-shared [`RuntimeRefResolver`]. Consulted by the
151    /// `greentic:runtime-config@1.0.0` host import AFTER `non_secret` and
152    /// BEFORE the compat shim. `None` when no producer set it yet.
153    ///
154    /// [`RuntimeRefResolver`]: crate::runtime_refs::RuntimeRefResolver
155    runtime_refs: Option<RuntimeRefsInjection>,
156}
157
158struct PackComponent {
159    #[allow(dead_code)]
160    name: String,
161    #[allow(dead_code)]
162    version: String,
163    component: Arc<Component>,
164}
165
166/// Outcome of calling a provider component's `identify-instance` export
167/// (`greentic:provider-instance-identity@0.1.0`). Callers MUST treat the
168/// three variants differently per the WIT contract.
169#[derive(Debug, Clone, PartialEq, Eq)]
170pub enum IdentifyOutcome {
171    /// Component does not export the world — caller falls back to the
172    /// operator's statically-declared `provider_id`.
173    Unsupported,
174    /// Component exported the world and returned `None` — caller MUST
175    /// fail closed (401/404), no fallback.
176    NoMatch,
177    /// Component identified the payload as belonging to this
178    /// `provider_id` — caller routes to the matching `MessagingEndpoint`.
179    Identified(String),
180}
181
182impl IdentifyOutcome {
183    /// Merge `other` into `self` per the lattice
184    /// `Identified > NoMatch > Unsupported`. Used by callers fanning the probe
185    /// out over multiple packs (overlays) where the strongest signal across
186    /// packs wins.
187    pub fn merge_in(&mut self, other: IdentifyOutcome) {
188        match (&*self, &other) {
189            // Identified is the top — never gets overwritten.
190            (IdentifyOutcome::Identified(_), _) => {}
191            // Promote to Identified from anything else.
192            (_, IdentifyOutcome::Identified(_)) => *self = other,
193            // NoMatch promotes Unsupported but cannot downgrade itself.
194            (IdentifyOutcome::Unsupported, IdentifyOutcome::NoMatch) => *self = other,
195            _ => {}
196        }
197    }
198}
199
200fn run_on_wasi_thread<F, T>(task_name: &'static str, task: F) -> Result<T>
201where
202    F: FnOnce() -> Result<T> + Send + 'static,
203    T: Send + 'static,
204{
205    let builder = std::thread::Builder::new().name(format!("greentic-wasmtime-{task_name}"));
206    let handle = builder
207        .spawn(move || {
208            let pid = std::process::id();
209            let thread_id = std::thread::current().id();
210            let tokio_handle_present = tokio::runtime::Handle::try_current().is_ok();
211            tracing::info!(
212                event = "wasmtime.thread.start",
213                task = task_name,
214                pid,
215                thread_id = ?thread_id,
216                tokio_handle_present,
217                "starting Wasmtime thread"
218            );
219            task()
220        })
221        .context("failed to spawn Wasmtime thread")?;
222    handle
223        .join()
224        .map_err(|err| {
225            let reason = if let Some(msg) = err.downcast_ref::<&str>() {
226                msg.to_string()
227            } else if let Some(msg) = err.downcast_ref::<String>() {
228                msg.clone()
229            } else {
230                "unknown panic".to_string()
231            };
232            anyhow!("Wasmtime thread panicked: {reason}")
233        })
234        .and_then(|res| res)
235}
236
237#[derive(Debug, Default, Clone)]
238pub struct ComponentResolution {
239    /// Root of a materialized pack directory containing `manifest.cbor` and `components/`.
240    pub materialized_root: Option<PathBuf>,
241    /// Explicit overrides mapping component id -> wasm path.
242    pub overrides: HashMap<String, PathBuf>,
243    /// If true, do not fetch remote components; require cached artifacts.
244    pub dist_offline: bool,
245    /// Optional cache directory for resolved remote components.
246    pub dist_cache_dir: Option<PathBuf>,
247    /// Allow bundled components without wasm_sha256 (dev-only escape hatch).
248    pub allow_missing_hash: bool,
249}
250
251fn build_blocking_client() -> BlockingClient {
252    std::thread::spawn(|| {
253        BlockingClient::builder()
254            .no_proxy()
255            .build()
256            .expect("blocking client")
257    })
258    .join()
259    .expect("client build thread panicked")
260}
261
262fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
263    let (root, candidate) = if path.is_absolute() {
264        let parent = path
265            .parent()
266            .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
267        let root = parent
268            .canonicalize()
269            .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
270        let file = path
271            .file_name()
272            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
273        (root, PathBuf::from(file))
274    } else {
275        let cwd = std::env::current_dir().context("failed to resolve current directory")?;
276        let base = if let Some(parent) = path.parent() {
277            cwd.join(parent)
278        } else {
279            cwd
280        };
281        let root = base
282            .canonicalize()
283            .with_context(|| format!("failed to canonicalize {}", base.display()))?;
284        let file = path
285            .file_name()
286            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
287        (root, PathBuf::from(file))
288    };
289    let safe = normalize_under_root(&root, &candidate)?;
290    Ok((root, safe))
291}
292
293static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
294
295#[derive(Debug, Clone, Serialize, Deserialize)]
296pub struct FlowDescriptor {
297    pub id: String,
298    #[serde(rename = "type")]
299    pub flow_type: String,
300    pub pack_id: String,
301    pub profile: String,
302    pub version: String,
303    #[serde(default)]
304    pub description: Option<String>,
305}
306
307pub struct HostState {
308    #[allow(dead_code)]
309    pack_id: String,
310    config: Arc<HostConfig>,
311    http_client: Arc<BlockingClient>,
312    default_env: String,
313    #[allow(dead_code)]
314    session_store: Option<DynSessionStore>,
315    state_store: Option<DynStateStore>,
316    mocks: Option<Arc<MockLayer>>,
317    secrets: DynSecretsManager,
318    oauth_config: Option<OAuthBrokerConfig>,
319    oauth_host: OAuthBrokerHost,
320    exec_ctx: Option<ComponentExecCtx>,
321    component_ref: Option<String>,
322    provider_core_component: bool,
323    /// `pack-config.v1.non_secret` map for the `greentic:runtime-config@1.0.0`
324    /// host import. Populated by the producer (greentic-start) from the
325    /// deployed `PackConfig`; `None` when no PackConfig was published, in
326    /// which case lookups fall back to the secrets-store compat shim with
327    /// a once-per-process deprecation warning.
328    runtime_config_non_secret: Option<Arc<BTreeMap<String, Value>>>,
329    /// `pack-config.v1.runtime_refs` (C5) injection: per-pack `key → URI`
330    /// bindings plus the env-shared resolver. The host import resolves the
331    /// URI on every call so the value tracks `runtime.json` hot-reloads.
332    runtime_refs: Option<RuntimeRefsInjection>,
333}
334
335impl HostState {
336    #[allow(clippy::default_constructed_unit_structs)]
337    #[allow(clippy::too_many_arguments)]
338    pub fn new(
339        pack_id: String,
340        config: Arc<HostConfig>,
341        http_client: Arc<BlockingClient>,
342        mocks: Option<Arc<MockLayer>>,
343        session_store: Option<DynSessionStore>,
344        state_store: Option<DynStateStore>,
345        secrets: DynSecretsManager,
346        oauth_config: Option<OAuthBrokerConfig>,
347        exec_ctx: Option<ComponentExecCtx>,
348        component_ref: Option<String>,
349        provider_core_component: bool,
350        runtime_config_non_secret: Option<Arc<BTreeMap<String, Value>>>,
351        runtime_refs: Option<RuntimeRefsInjection>,
352    ) -> Result<Self> {
353        let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
354        Ok(Self {
355            pack_id,
356            config,
357            http_client,
358            default_env,
359            session_store,
360            state_store,
361            mocks,
362            secrets,
363            oauth_config,
364            oauth_host: OAuthBrokerHost::default(),
365            exec_ctx,
366            component_ref,
367            provider_core_component,
368            runtime_config_non_secret,
369            runtime_refs,
370        })
371    }
372
373    fn instantiate_component_result(
374        linker: &mut Linker<ComponentState>,
375        store: &mut Store<ComponentState>,
376        component: &Component,
377        ctx: &ComponentExecCtx,
378        component_ref: &str,
379        operation: &str,
380        input_json: &str,
381    ) -> Result<InvokeResult> {
382        let pre_instance = linker.instantiate_pre(component)?;
383        match component_api::v0_6::ComponentPre::new(pre_instance) {
384            Ok(pre) => {
385                let envelope = component_api::envelope_v0_6(ctx, component_ref, input_json)?;
386                let operation_owned = operation.to_string();
387                let result = block_on(async {
388                    let bindings = pre.instantiate_async(&mut *store).await?;
389                    let node = bindings.greentic_component_node();
390                    node.call_invoke(&mut *store, &operation_owned, &envelope)
391                })?;
392                component_api::invoke_result_from_v0_6(result)
393            }
394            Err(err_v06) => {
395                if !is_missing_node_export(&err_v06, "0.6.0") {
396                    return Err(err_v06.into());
397                }
398                let pre_instance = linker.instantiate_pre(component)?;
399                match component_api::v0_5::ComponentPre::new(pre_instance) {
400                    Ok(pre) => {
401                        let result = block_on(async {
402                            let bindings = pre.instantiate_async(&mut *store).await?;
403                            let node = bindings.greentic_component_node();
404                            let ctx_v05 = component_api::exec_ctx_v0_5(ctx);
405                            let operation_owned = operation.to_string();
406                            let input_owned = input_json.to_string();
407                            node.call_invoke(&mut *store, &ctx_v05, &operation_owned, &input_owned)
408                        })?;
409                        Ok(component_api::invoke_result_from_v0_5(result))
410                    }
411                    Err(err) => {
412                        if !is_missing_node_export(&err, "0.5.0") {
413                            return Err(err.into());
414                        }
415                        let pre_instance = linker.instantiate_pre(component)?;
416                        match component_api::v0_4::ComponentPre::new(pre_instance) {
417                            Ok(pre) => {
418                                let result = block_on(async {
419                                    let bindings = pre.instantiate_async(&mut *store).await?;
420                                    let node = bindings.greentic_component_node();
421                                    let ctx_v04 = component_api::exec_ctx_v0_4(ctx);
422                                    let operation_owned = operation.to_string();
423                                    let input_owned = input_json.to_string();
424                                    node.call_invoke(
425                                        &mut *store,
426                                        &ctx_v04,
427                                        &operation_owned,
428                                        &input_owned,
429                                    )
430                                })?;
431                                Ok(component_api::invoke_result_from_v0_4(result))
432                            }
433                            Err(err_v04) => {
434                                if is_missing_node_export(&err_v04, "0.4.0") {
435                                    Self::try_v06_runtime(linker, store, component, input_json)
436                                } else {
437                                    Err(err_v04.into())
438                                }
439                            }
440                        }
441                    }
442                }
443            }
444        }
445    }
446
447    /// Fallback for v0.6 components that export `component-runtime::run(input, state)`
448    /// instead of the legacy `node::invoke(ctx, op, input)`.
449    fn try_v06_runtime(
450        linker: &mut Linker<ComponentState>,
451        store: &mut Store<ComponentState>,
452        component: &Component,
453        input_json: &str,
454    ) -> Result<InvokeResult> {
455        let pre_instance = linker.instantiate_pre(component)?;
456        let pre = component_api::v0_6_runtime::ComponentV0V6RuntimePre::new(pre_instance).map_err(
457            |err| err.context("component exports neither node@0.5/0.4 nor component-runtime@0.6"),
458        )?;
459
460        let result = block_on(async {
461            let bindings = pre.instantiate_async(&mut *store).await?;
462            let runtime = bindings.greentic_component_component_runtime();
463
464            // Encode input as CBOR — the component's run() expects CBOR bytes.
465            let input_value: Value = serde_json::from_str(input_json).unwrap_or(Value::Null);
466            let input_cbor =
467                serde_cbor::to_vec(&input_value).context("encode input as CBOR for v0.6")?;
468            let empty_state = serde_cbor::to_vec(&Value::Object(Default::default()))
469                .context("encode empty state")?;
470
471            let run_result = runtime
472                .call_run(&mut *store, &input_cbor, &empty_state)
473                .map_err(|err| err.context("v0.6 component-runtime::run call failed"))?;
474
475            // Decode output CBOR to JSON.
476            let output_value: Value = serde_cbor::from_slice(&run_result.output)
477                .context("decode v0.6 run output CBOR")?;
478            let output_json = serde_json::to_string(&output_value)
479                .context("serialize v0.6 run output to JSON")?;
480
481            Ok::<_, anyhow::Error>(output_json)
482        })?;
483
484        Ok(InvokeResult::Ok(result))
485    }
486
487    fn convert_invoke_result(result: InvokeResult) -> Result<Value> {
488        match result {
489            InvokeResult::Ok(body) => {
490                if body.is_empty() {
491                    return Ok(Value::Null);
492                }
493                serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
494            }
495            InvokeResult::Err(NodeError {
496                code,
497                message,
498                retryable,
499                backoff_ms,
500                details,
501            }) => {
502                let mut obj = serde_json::Map::new();
503                obj.insert("ok".into(), Value::Bool(false));
504                let mut error = serde_json::Map::new();
505                error.insert("code".into(), Value::String(code));
506                error.insert("message".into(), Value::String(message));
507                error.insert("retryable".into(), Value::Bool(retryable));
508                if let Some(backoff) = backoff_ms {
509                    error.insert("backoff_ms".into(), Value::Number(backoff.into()));
510                }
511                if let Some(details) = details {
512                    error.insert(
513                        "details".into(),
514                        serde_json::from_str(&details).unwrap_or(Value::String(details)),
515                    );
516                }
517                obj.insert("error".into(), Value::Object(error));
518                Ok(Value::Object(obj))
519            }
520        }
521    }
522
523    /// Build a `TenantCtx` for secrets lookups that includes the team from the
524    /// execution context. `config.tenant_ctx()` only populates env + tenant;
525    /// without this, secrets scoped to a specific team are unreachable.
526    fn secrets_tenant_ctx(&self) -> TypesTenantCtx {
527        let mut ctx = self.config.tenant_ctx();
528        if let Some(exec_ctx) = self.exec_ctx.as_ref()
529            && let Some(team) = exec_ctx.tenant.team.as_ref()
530            && let Ok(team_id) = TeamId::from_str(team)
531        {
532            ctx = ctx.with_team(Some(team_id));
533        }
534        ctx
535    }
536
537    pub fn get_secret(&self, key: &str) -> Result<String> {
538        if provider_core_only::is_enabled() {
539            bail!(provider_core_only::blocked_message("secrets"))
540        }
541        if !self.config.secrets_policy.is_allowed(key) {
542            bail!("secret {key} is not permitted by bindings policy");
543        }
544        if let Some(mock) = &self.mocks
545            && let Some(value) = mock.secrets_lookup(key)
546        {
547            return Ok(value);
548        }
549        let ctx = self.secrets_tenant_ctx();
550        let canonical_key = canonicalize_secret_key(key);
551        let bytes = read_secret_blocking(&self.secrets, &ctx, &self.pack_id, &canonical_key)
552            .context("failed to read secret from manager")?;
553        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
554        Ok(value)
555    }
556
557    fn allows_secret_write_in_provider_core_only(&self) -> bool {
558        self.provider_core_component || self.component_ref.is_none()
559    }
560
561    fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
562        let tenant_raw = ctx
563            .as_ref()
564            .map(|ctx| ctx.tenant.clone())
565            .or_else(|| self.exec_ctx.as_ref().map(|ctx| ctx.tenant.tenant.clone()))
566            .unwrap_or_else(|| self.config.tenant.clone());
567        let env_raw = ctx
568            .as_ref()
569            .map(|ctx| ctx.env.clone())
570            .unwrap_or_else(|| self.default_env.clone());
571        let tenant_id = TenantId::from_str(&tenant_raw)
572            .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
573        let env_id = EnvId::from_str(&env_raw)
574            .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
575        let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
576        if let Some(exec_ctx) = self.exec_ctx.as_ref() {
577            if let Some(team) = exec_ctx.tenant.team.as_ref() {
578                let team_id =
579                    TeamId::from_str(team).with_context(|| format!("invalid team id `{team}`"))?;
580                tenant_ctx = tenant_ctx.with_team(Some(team_id));
581            }
582            if let Some(user) = exec_ctx.tenant.user.as_ref() {
583                let user_id =
584                    UserId::from_str(user).with_context(|| format!("invalid user id `{user}`"))?;
585                tenant_ctx = tenant_ctx.with_user(Some(user_id));
586            }
587            tenant_ctx = tenant_ctx.with_flow(exec_ctx.flow_id.clone());
588            if let Some(node) = exec_ctx.node_id.as_ref() {
589                tenant_ctx = tenant_ctx.with_node(node.clone());
590            }
591            if let Some(session) = exec_ctx.tenant.correlation_id.as_ref() {
592                tenant_ctx = tenant_ctx.with_session(session.clone());
593            }
594            tenant_ctx.trace_id = exec_ctx.tenant.trace_id.clone();
595        }
596
597        if let Some(ctx) = ctx {
598            if let Some(team) = ctx.team.or(ctx.team_id) {
599                let team_id =
600                    TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
601                tenant_ctx = tenant_ctx.with_team(Some(team_id));
602            }
603            if let Some(user) = ctx.user.or(ctx.user_id) {
604                let user_id =
605                    UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
606                tenant_ctx = tenant_ctx.with_user(Some(user_id));
607            }
608            if let Some(flow) = ctx.flow_id {
609                tenant_ctx = tenant_ctx.with_flow(flow);
610            }
611            if let Some(node) = ctx.node_id {
612                tenant_ctx = tenant_ctx.with_node(node);
613            }
614            if let Some(provider) = ctx.provider_id {
615                tenant_ctx = tenant_ctx.with_provider(provider);
616            }
617            if let Some(session) = ctx.session_id {
618                tenant_ctx = tenant_ctx.with_session(session);
619            }
620            tenant_ctx.trace_id = ctx.trace_id;
621        }
622        Ok(tenant_ctx)
623    }
624
625    fn send_http_request(
626        &mut self,
627        req: HttpRequest,
628        opts: Option<HttpRequestOptionsV1_1>,
629        _ctx: Option<HttpTenantCtx>,
630    ) -> Result<HttpResponse, HttpClientError> {
631        if !self.config.http_enabled {
632            return Err(HttpClientError {
633                code: "denied".into(),
634                message: "http client disabled by policy".into(),
635            });
636        }
637
638        let mut mock_state = None;
639        let raw_body = req.body.clone();
640        if let Some(mock) = &self.mocks
641            && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
642        {
643            match mock.http_begin(&meta) {
644                HttpDecision::Mock(response) => {
645                    let headers = response
646                        .headers
647                        .iter()
648                        .map(|(k, v)| (k.clone(), v.clone()))
649                        .collect();
650                    return Ok(HttpResponse {
651                        status: response.status,
652                        headers,
653                        body: response.body.clone().map(|b| b.into_bytes()),
654                    });
655                }
656                HttpDecision::Deny(reason) => {
657                    return Err(HttpClientError {
658                        code: "denied".into(),
659                        message: reason,
660                    });
661                }
662                HttpDecision::Passthrough { record } => {
663                    mock_state = Some((meta, record));
664                }
665            }
666        }
667
668        let method = req.method.parse().unwrap_or(reqwest::Method::GET);
669        let mut builder = self.http_client.request(method, &req.url);
670        for (key, value) in req.headers {
671            if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
672                && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
673            {
674                builder = builder.header(header, header_value);
675            }
676        }
677
678        if let Some(body) = raw_body.clone() {
679            builder = builder.body(body);
680        }
681
682        if let Some(opts) = opts {
683            if let Some(timeout_ms) = opts.timeout_ms {
684                builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
685            }
686            if opts.allow_insecure == Some(true) {
687                warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
688            }
689            if let Some(follow_redirects) = opts.follow_redirects
690                && !follow_redirects
691            {
692                warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
693            }
694        }
695
696        let response = match builder.send() {
697            Ok(resp) => resp,
698            Err(err) => {
699                warn!(url = %req.url, error = %err, "http client request failed");
700                return Err(HttpClientError {
701                    code: "unavailable".into(),
702                    message: err.to_string(),
703                });
704            }
705        };
706
707        let status = response.status().as_u16();
708        let headers_vec = response
709            .headers()
710            .iter()
711            .map(|(k, v)| {
712                (
713                    k.as_str().to_string(),
714                    v.to_str().unwrap_or_default().to_string(),
715                )
716            })
717            .collect::<Vec<_>>();
718        let body_bytes = response.bytes().ok().map(|b| b.to_vec());
719
720        if let Some((meta, true)) = mock_state.take()
721            && let Some(mock) = &self.mocks
722        {
723            let recorded = HttpMockResponse::new(
724                status,
725                headers_vec.clone().into_iter().collect(),
726                body_bytes
727                    .as_ref()
728                    .map(|b| String::from_utf8_lossy(b).into_owned()),
729            );
730            mock.http_record(&meta, &recorded);
731        }
732
733        Ok(HttpResponse {
734            status,
735            headers: headers_vec,
736            body: body_bytes,
737        })
738    }
739}
740
741#[cfg(test)]
742mod canonicalize_tests {
743    use crate::secrets::canonicalize_secret_key;
744
745    #[test]
746    fn upper_snake_to_lower_snake() {
747        assert_eq!(
748            canonicalize_secret_key("TELEGRAM_BOT_TOKEN"),
749            "telegram_bot_token"
750        );
751    }
752
753    #[test]
754    fn trim_and_replace_non_alphanumeric() {
755        assert_eq!(
756            canonicalize_secret_key("  webex-bot-token  "),
757            "webex_bot_token"
758        );
759    }
760
761    #[test]
762    fn preserve_existing_lower_snake_with_extra_underscores() {
763        assert_eq!(canonicalize_secret_key("MiXeD__Case"), "mixed__case");
764    }
765}
766
767impl SecretsStoreHost for HostState {
768    fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
769        if provider_core_only::is_enabled() {
770            warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
771            return Err(SecretsError::Denied);
772        }
773        if !self.config.secrets_policy.is_allowed(&key) {
774            return Err(SecretsError::Denied);
775        }
776        if let Some(mock) = &self.mocks
777            && let Some(value) = mock.secrets_lookup(&key)
778        {
779            return Ok(Some(value.into_bytes()));
780        }
781        let ctx = self.secrets_tenant_ctx();
782        let canonical_key = canonicalize_secret_key(&key);
783        match read_secret_blocking(&self.secrets, &ctx, &self.pack_id, &canonical_key) {
784            Ok(bytes) => Ok(Some(bytes)),
785            Err(err) => {
786                warn!(secret = %key, canonical = %canonical_key, error = %err, "secret lookup failed");
787                Err(SecretsError::NotFound)
788            }
789        }
790    }
791}
792
793impl SecretsStoreHostV1_1 for HostState {
794    fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsErrorV1_1> {
795        if provider_core_only::is_enabled() {
796            warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
797            return Err(SecretsErrorV1_1::Denied);
798        }
799        if !self.config.secrets_policy.is_allowed(&key) {
800            return Err(SecretsErrorV1_1::Denied);
801        }
802        if let Some(mock) = &self.mocks
803            && let Some(value) = mock.secrets_lookup(&key)
804        {
805            return Ok(Some(value.into_bytes()));
806        }
807        let ctx = self.secrets_tenant_ctx();
808        let canonical_key = canonicalize_secret_key(&key);
809        match read_secret_blocking(&self.secrets, &ctx, &self.pack_id, &canonical_key) {
810            Ok(bytes) => Ok(Some(bytes)),
811            Err(err) => {
812                warn!(secret = %key, canonical = %canonical_key, error = %err, "secret lookup failed");
813                Err(SecretsErrorV1_1::NotFound)
814            }
815        }
816    }
817
818    fn put(&mut self, key: String, value: Vec<u8>) {
819        if key.trim().is_empty() {
820            warn!(secret = %key, "secret write blocked: empty key");
821            panic!("secret write denied for key {key}: invalid key");
822        }
823        if provider_core_only::is_enabled() && !self.allows_secret_write_in_provider_core_only() {
824            warn!(
825                secret = %key,
826                component = self.component_ref.as_deref().unwrap_or("<pack>"),
827                "provider-core only mode enabled; blocking secrets store write"
828            );
829            panic!("secret write denied for key {key}: provider-core-only mode");
830        }
831        if !self.config.secrets_policy.is_allowed(&key) {
832            warn!(secret = %key, "secret write denied by bindings policy");
833            panic!("secret write denied for key {key}: policy");
834        }
835        let ctx = self.secrets_tenant_ctx();
836        let canonical_key = canonicalize_secret_key(&key);
837        if let Err(err) =
838            write_secret_blocking(&self.secrets, &ctx, &self.pack_id, &canonical_key, &value)
839        {
840            warn!(secret = %key, canonical = %canonical_key, error = %err, "secret write failed");
841            panic!("secret write failed for key {key}");
842        }
843    }
844}
845
846/// Process-global set of `pack-config.v1` keys for which the compat shim has
847/// already logged a deprecation warning. Used to debounce once-per-process
848/// per-key so resolving the same legacy key from many invocations does not
849/// spam the log.
850static WARNED_COMPAT_KEYS: Lazy<Mutex<HashSet<String>>> = Lazy::new(|| Mutex::new(HashSet::new()));
851
852fn warn_compat_fallback_once(key: &str) {
853    let mut warned = WARNED_COMPAT_KEYS.lock();
854    if warned.insert(key.to_string()) {
855        warn!(
856            key = %key,
857            "runtime-config key resolved via secrets-store compat fallback; \
858             move this value into pack-config.v1.non_secret"
859        );
860    }
861}
862
863impl RuntimeConfigHost for HostState {
864    fn get(&mut self, key: String) -> Result<Option<String>, ConfigError> {
865        if key.trim().is_empty() {
866            return Err(ConfigError::InvalidKey);
867        }
868
869        // 1) Primary channel: pack-config.v1.non_secret. Values are stored as
870        //    `serde_json::Value`; the WIT contract returns UTF-8 strings
871        //    conventionally JSON-encoded, so stringify here.
872        if let Some(map) = self.runtime_config_non_secret.as_ref()
873            && let Some(value) = map.get(&key)
874        {
875            return serde_json::to_string(value).map(Some).map_err(|err| {
876                warn!(key = %key, error = %err, "runtime-config value JSON-encode failed");
877                ConfigError::Internal
878            });
879        }
880
881        // 1b) C5 channel: pack-config.v1.runtime_refs. Resolved on every call
882        //     so values track `runtime.json` hot-reloads. The per-pack `refs`
883        //     map gates which keys this channel claims; non-bound keys fall
884        //     through to the compat shim.
885        if let Some(injection) = self.runtime_refs.as_ref()
886            && let Some(uri) = injection.refs.get(&key)
887        {
888            use crate::runtime_refs::RuntimeRefResolverError;
889            return match injection.resolver.resolve(uri) {
890                Ok(Some(value)) => serde_json::to_string(&value).map(Some).map_err(|err| {
891                    warn!(key = %key, error = %err, "runtime-ref value JSON-encode failed");
892                    ConfigError::Internal
893                }),
894                Ok(None) => Ok(None),
895                Err(err @ RuntimeRefResolverError::Invalid(_)) => {
896                    warn!(key = %key, error = %err, "runtime-ref rejected");
897                    Err(ConfigError::InvalidKey)
898                }
899                Err(err @ RuntimeRefResolverError::Internal(_)) => {
900                    warn!(key = %key, error = %err, "runtime-ref resolution failed");
901                    Err(ConfigError::Internal)
902                }
903            };
904        }
905
906        // 2) Compat fallback: try the secrets-store. Warn once per key per
907        //    process so this stays visible without spamming the log.
908        match SecretsStoreHost::get(self, key.clone()) {
909            Ok(Some(bytes)) => match String::from_utf8(bytes) {
910                Ok(value) => {
911                    warn_compat_fallback_once(&key);
912                    Ok(Some(value))
913                }
914                Err(_) => {
915                    warn!(
916                        key = %key,
917                        "runtime-config compat fallback found non-UTF-8 secret bytes; \
918                         returning not-found"
919                    );
920                    Err(ConfigError::Internal)
921                }
922            },
923            Ok(None) => Ok(None),
924            Err(SecretsError::NotFound) => Ok(None),
925            Err(SecretsError::Denied) => Err(ConfigError::Denied),
926            Err(SecretsError::InvalidKey) => Err(ConfigError::InvalidKey),
927            Err(SecretsError::Internal) => Err(ConfigError::Internal),
928        }
929    }
930}
931
932impl HttpClientHost for HostState {
933    fn send(
934        &mut self,
935        req: HttpRequest,
936        ctx: Option<HttpTenantCtx>,
937    ) -> Result<HttpResponse, HttpClientError> {
938        self.send_http_request(req, None, ctx)
939    }
940}
941
942impl HttpClientHostV1_1 for HostState {
943    fn send(
944        &mut self,
945        req: HttpRequestV1_1,
946        opts: Option<HttpRequestOptionsV1_1>,
947        ctx: Option<HttpTenantCtxV1_1>,
948    ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
949        let legacy_req = HttpRequest {
950            method: req.method,
951            url: req.url,
952            headers: req.headers,
953            body: req.body,
954        };
955        let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
956            env: ctx.env,
957            tenant: ctx.tenant,
958            tenant_id: ctx.tenant_id,
959            team: ctx.team,
960            team_id: ctx.team_id,
961            user: ctx.user,
962            user_id: ctx.user_id,
963            trace_id: ctx.trace_id,
964            correlation_id: ctx.correlation_id,
965            i18n_id: ctx.i18n_id,
966            attributes: ctx.attributes,
967            session_id: ctx.session_id,
968            flow_id: ctx.flow_id,
969            node_id: ctx.node_id,
970            provider_id: ctx.provider_id,
971            deadline_ms: ctx.deadline_ms,
972            attempt: ctx.attempt,
973            idempotency_key: ctx.idempotency_key,
974            impersonation: ctx.impersonation.map(|imp| http_types_v1_0::Impersonation {
975                actor_id: imp.actor_id,
976                reason: imp.reason,
977            }),
978        });
979
980        self.send_http_request(legacy_req, opts, legacy_ctx)
981            .map(|resp| HttpResponseV1_1 {
982                status: resp.status,
983                headers: resp.headers,
984                body: resp.body,
985            })
986            .map_err(|err| HttpClientErrorV1_1 {
987                code: err.code,
988                message: err.message,
989            })
990    }
991}
992
993impl StateStoreHost for HostState {
994    fn read(
995        &mut self,
996        key: HostStateKey,
997        ctx: Option<StateTenantCtx>,
998    ) -> Result<Vec<u8>, StateError> {
999        let store = match self.state_store.as_ref() {
1000            Some(store) => store.clone(),
1001            None => {
1002                return Err(StateError {
1003                    code: "unavailable".into(),
1004                    message: "state store not configured".into(),
1005                });
1006            }
1007        };
1008        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
1009            Ok(ctx) => ctx,
1010            Err(err) => {
1011                return Err(StateError {
1012                    code: "invalid-ctx".into(),
1013                    message: err.to_string(),
1014                });
1015            }
1016        };
1017        #[cfg(feature = "fault-injection")]
1018        {
1019            let exec_ctx = self.exec_ctx.as_ref();
1020            let flow_id = exec_ctx
1021                .map(|ctx| ctx.flow_id.as_str())
1022                .unwrap_or("unknown");
1023            let node_id = exec_ctx.and_then(|ctx| ctx.node_id.as_deref());
1024            let attempt = exec_ctx.map(|ctx| ctx.tenant.attempt).unwrap_or(1);
1025            let fault_ctx = FaultContext {
1026                pack_id: self.pack_id.as_str(),
1027                flow_id,
1028                node_id,
1029                attempt,
1030            };
1031            if let Err(err) = maybe_fail(FaultPoint::StateRead, fault_ctx) {
1032                return Err(StateError {
1033                    code: "internal".into(),
1034                    message: err.to_string(),
1035                });
1036            }
1037        }
1038        let key = StoreStateKey::from(key);
1039        match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
1040            Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
1041            Ok(None) => Err(StateError {
1042                code: "not_found".into(),
1043                message: "state key not found".into(),
1044            }),
1045            Err(err) => Err(StateError {
1046                code: "internal".into(),
1047                message: err.to_string(),
1048            }),
1049        }
1050    }
1051
1052    fn write(
1053        &mut self,
1054        key: HostStateKey,
1055        bytes: Vec<u8>,
1056        ctx: Option<StateTenantCtx>,
1057    ) -> Result<StateOpAck, StateError> {
1058        let store = match self.state_store.as_ref() {
1059            Some(store) => store.clone(),
1060            None => {
1061                return Err(StateError {
1062                    code: "unavailable".into(),
1063                    message: "state store not configured".into(),
1064                });
1065            }
1066        };
1067        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
1068            Ok(ctx) => ctx,
1069            Err(err) => {
1070                return Err(StateError {
1071                    code: "invalid-ctx".into(),
1072                    message: err.to_string(),
1073                });
1074            }
1075        };
1076        #[cfg(feature = "fault-injection")]
1077        {
1078            let exec_ctx = self.exec_ctx.as_ref();
1079            let flow_id = exec_ctx
1080                .map(|ctx| ctx.flow_id.as_str())
1081                .unwrap_or("unknown");
1082            let node_id = exec_ctx.and_then(|ctx| ctx.node_id.as_deref());
1083            let attempt = exec_ctx.map(|ctx| ctx.tenant.attempt).unwrap_or(1);
1084            let fault_ctx = FaultContext {
1085                pack_id: self.pack_id.as_str(),
1086                flow_id,
1087                node_id,
1088                attempt,
1089            };
1090            if let Err(err) = maybe_fail(FaultPoint::StateWrite, fault_ctx) {
1091                return Err(StateError {
1092                    code: "internal".into(),
1093                    message: err.to_string(),
1094                });
1095            }
1096        }
1097        let key = StoreStateKey::from(key);
1098        let value = serde_json::from_slice(&bytes)
1099            .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
1100        match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
1101            Ok(()) => Ok(StateOpAck::Ok),
1102            Err(err) => Err(StateError {
1103                code: "internal".into(),
1104                message: err.to_string(),
1105            }),
1106        }
1107    }
1108
1109    fn delete(
1110        &mut self,
1111        key: HostStateKey,
1112        ctx: Option<StateTenantCtx>,
1113    ) -> Result<StateOpAck, StateError> {
1114        let store = match self.state_store.as_ref() {
1115            Some(store) => store.clone(),
1116            None => {
1117                return Err(StateError {
1118                    code: "unavailable".into(),
1119                    message: "state store not configured".into(),
1120                });
1121            }
1122        };
1123        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
1124            Ok(ctx) => ctx,
1125            Err(err) => {
1126                return Err(StateError {
1127                    code: "invalid-ctx".into(),
1128                    message: err.to_string(),
1129                });
1130            }
1131        };
1132        let key = StoreStateKey::from(key);
1133        match store.del(&tenant_ctx, STATE_PREFIX, &key) {
1134            Ok(_) => Ok(StateOpAck::Ok),
1135            Err(err) => Err(StateError {
1136                code: "internal".into(),
1137                message: err.to_string(),
1138            }),
1139        }
1140    }
1141}
1142
1143impl TelemetryLoggerHost for HostState {
1144    fn log(
1145        &mut self,
1146        span: TelemetrySpanContext,
1147        fields: Vec<(String, String)>,
1148        _ctx: Option<TelemetryTenantCtx>,
1149    ) -> Result<TelemetryAck, TelemetryError> {
1150        if let Some(mock) = &self.mocks
1151            && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
1152        {
1153            return Ok(TelemetryAck::Ok);
1154        }
1155        let mut map = serde_json::Map::new();
1156        for (k, v) in fields {
1157            map.insert(k, Value::String(v));
1158        }
1159        tracing::info!(
1160            tenant = %span.tenant,
1161            flow_id = %span.flow_id,
1162            node = ?span.node_id,
1163            provider = %span.provider,
1164            fields = %serde_json::Value::Object(map.clone()),
1165            "telemetry log from pack"
1166        );
1167        Ok(TelemetryAck::Ok)
1168    }
1169}
1170
1171impl RunnerHostHttp for HostState {
1172    fn request(
1173        &mut self,
1174        method: String,
1175        url: String,
1176        headers: Vec<String>,
1177        body: Option<Vec<u8>>,
1178    ) -> Result<Vec<u8>, String> {
1179        let req = HttpRequest {
1180            method,
1181            url,
1182            headers: headers
1183                .chunks(2)
1184                .filter_map(|chunk| {
1185                    if chunk.len() == 2 {
1186                        Some((chunk[0].clone(), chunk[1].clone()))
1187                    } else {
1188                        None
1189                    }
1190                })
1191                .collect(),
1192            body,
1193        };
1194        match HttpClientHost::send(self, req, None) {
1195            Ok(resp) => Ok(resp.body.unwrap_or_default()),
1196            Err(err) => Err(err.message),
1197        }
1198    }
1199}
1200
1201impl RunnerHostKv for HostState {
1202    fn get(&mut self, _ns: String, _key: String) -> Option<String> {
1203        None
1204    }
1205
1206    fn put(&mut self, _ns: String, _key: String, _val: String) {}
1207}
1208
1209enum ManifestLoad {
1210    New {
1211        manifest: Box<greentic_types::PackManifest>,
1212        flows: PackFlows,
1213    },
1214    Legacy {
1215        manifest: Box<legacy_pack::PackManifest>,
1216        flows: PackFlows,
1217    },
1218}
1219
1220fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
1221    let mut archive = ZipArchive::new(File::open(path)?)
1222        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1223    let bytes = read_entry(&mut archive, "manifest.cbor")
1224        .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
1225    match decode_pack_manifest(&bytes) {
1226        Ok(manifest) => {
1227            let cache = PackFlows::from_manifest(manifest.clone());
1228            Ok(ManifestLoad::New {
1229                manifest: Box::new(manifest),
1230                flows: cache,
1231            })
1232        }
1233        Err(err) => {
1234            tracing::debug!(
1235                error = %err,
1236                pack = %path.display(),
1237                "decode_pack_manifest failed for archive; falling back to legacy manifest"
1238            );
1239            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
1240                .context("failed to decode legacy pack manifest from manifest.cbor")?;
1241            let flows = load_legacy_flows_from_archive(&mut archive, &legacy)?;
1242            Ok(ManifestLoad::Legacy {
1243                manifest: Box::new(legacy),
1244                flows,
1245            })
1246        }
1247    }
1248}
1249
1250fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
1251    let manifest_path = root.join("manifest.cbor");
1252    let bytes = std::fs::read(&manifest_path)
1253        .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
1254    match decode_pack_manifest(&bytes) {
1255        Ok(manifest) => {
1256            let cache = PackFlows::from_manifest(manifest.clone());
1257            Ok(ManifestLoad::New {
1258                manifest: Box::new(manifest),
1259                flows: cache,
1260            })
1261        }
1262        Err(err) => {
1263            tracing::debug!(
1264                error = %err,
1265                pack = %root.display(),
1266                "decode_pack_manifest failed for materialized pack; trying legacy manifest"
1267            );
1268            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
1269                .context("failed to decode legacy pack manifest from manifest.cbor")?;
1270            let flows = load_legacy_flows_from_dir(root, &legacy)?;
1271            Ok(ManifestLoad::Legacy {
1272                manifest: Box::new(legacy),
1273                flows,
1274            })
1275        }
1276    }
1277}
1278
1279fn load_legacy_flows_from_dir(
1280    root: &Path,
1281    manifest: &legacy_pack::PackManifest,
1282) -> Result<PackFlows> {
1283    build_legacy_flows(manifest, |rel_path| {
1284        let path = root.join(rel_path);
1285        std::fs::read(&path).with_context(|| format!("missing flow json {}", path.display()))
1286    })
1287}
1288
1289fn load_legacy_flows_from_archive(
1290    archive: &mut ZipArchive<File>,
1291    manifest: &legacy_pack::PackManifest,
1292) -> Result<PackFlows> {
1293    build_legacy_flows(manifest, |rel_path| {
1294        read_entry(archive, rel_path).with_context(|| format!("missing flow json {}", rel_path))
1295    })
1296}
1297
1298fn build_legacy_flows(
1299    manifest: &legacy_pack::PackManifest,
1300    mut read_json: impl FnMut(&str) -> Result<Vec<u8>>,
1301) -> Result<PackFlows> {
1302    let mut flows = HashMap::new();
1303    let mut descriptors = Vec::new();
1304
1305    for entry in &manifest.flows {
1306        let bytes = read_json(&entry.file_json)
1307            .with_context(|| format!("missing flow json {}", entry.file_json))?;
1308        let doc = parse_flow_doc_with_legacy_aliases(&bytes)?;
1309        let normalized = normalize_flow_doc(doc);
1310        let flow_ir = flow_doc_to_ir(normalized)?;
1311        let flow = flow_ir_to_flow(flow_ir)?;
1312
1313        descriptors.push(FlowDescriptor {
1314            id: entry.id.clone(),
1315            flow_type: entry.kind.clone(),
1316            pack_id: manifest.meta.pack_id.clone(),
1317            profile: manifest.meta.pack_id.clone(),
1318            version: manifest.meta.version.to_string(),
1319            description: None,
1320        });
1321        flows.insert(entry.id.clone(), flow);
1322    }
1323
1324    let mut entry_flows = manifest.meta.entry_flows.clone();
1325    if entry_flows.is_empty() {
1326        entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
1327    }
1328    let metadata = PackMetadata {
1329        pack_id: manifest.meta.pack_id.clone(),
1330        version: manifest.meta.version.to_string(),
1331        entry_flows,
1332        secret_requirements: Vec::new(),
1333    };
1334
1335    Ok(PackFlows {
1336        descriptors,
1337        flows,
1338        metadata,
1339    })
1340}
1341
1342fn parse_flow_doc_with_legacy_aliases(bytes: &[u8]) -> Result<FlowDoc> {
1343    let mut value: Value =
1344        serde_json::from_slice(bytes).context("failed to decode flow doc JSON")?;
1345    if let Some(map) = value.as_object_mut()
1346        && !map.contains_key("type")
1347        && let Some(flow_type) = map.remove("flow_type")
1348    {
1349        map.insert("type".to_string(), flow_type);
1350    }
1351    serde_json::from_value(value).context("failed to decode flow doc structure")
1352}
1353
1354pub struct ComponentState {
1355    pub host: HostState,
1356    wasi_ctx: WasiCtx,
1357    wasi_tls_ctx: WasiTlsCtx,
1358    wasi_http_ctx: WasiHttpCtx,
1359    resource_table: ResourceTable,
1360}
1361
1362/// Install the process-default rustls [`rustls::crypto::CryptoProvider`] exactly once.
1363///
1364/// `wasmtime-wasi-tls` 45's `RustlsProvider::default()` builds its
1365/// `rustls::ClientConfig` via `ClientConfig::builder()`, which resolves the
1366/// process-default provider. Our dependency graph enables BOTH the `ring` and
1367/// `aws_lc_rs` rustls backends, so there is no unambiguous implicit default and
1368/// the builder panics ("no process-level CryptoProvider available") the first
1369/// time [`WasiTlsCtxBuilder::build`] constructs the default provider. Install
1370/// the workspace-selected aws-lc-rs provider before that ever happens.
1371/// Idempotent: a returned `Err` means a default was already installed.
1372fn install_default_crypto_provider() {
1373    static ONCE: std::sync::Once = std::sync::Once::new();
1374    ONCE.call_once(|| {
1375        let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
1376    });
1377}
1378
1379impl ComponentState {
1380    pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
1381        // Must run before `WasiTlsCtxBuilder::build()` below, which eagerly
1382        // constructs wasi-tls's default rustls provider.
1383        install_default_crypto_provider();
1384        let wasi_ctx = policy
1385            .instantiate()
1386            .context("failed to build WASI context")?;
1387        Ok(Self {
1388            host,
1389            wasi_ctx,
1390            wasi_tls_ctx: WasiTlsCtxBuilder::new().build(),
1391            wasi_http_ctx: WasiHttpCtx::new(),
1392            resource_table: ResourceTable::new(),
1393        })
1394    }
1395
1396    fn host_mut(&mut self) -> &mut HostState {
1397        &mut self.host
1398    }
1399
1400    fn should_cancel_host(&mut self) -> bool {
1401        false
1402    }
1403
1404    fn yield_now_host(&mut self) {
1405        // no-op cooperative yield
1406    }
1407}
1408
1409impl component_api::v0_4::greentic::component::control::Host for ComponentState {
1410    fn should_cancel(&mut self) -> bool {
1411        self.should_cancel_host()
1412    }
1413
1414    fn yield_now(&mut self) {
1415        self.yield_now_host();
1416    }
1417}
1418
1419impl component_api::v0_5::greentic::component::control::Host for ComponentState {
1420    fn should_cancel(&mut self) -> bool {
1421        self.should_cancel_host()
1422    }
1423
1424    fn yield_now(&mut self) {
1425        self.yield_now_host();
1426    }
1427}
1428
1429fn add_component_control_instance(
1430    linker: &mut Linker<ComponentState>,
1431    name: &str,
1432) -> wasmtime::Result<()> {
1433    let mut inst = linker.instance(name)?;
1434    inst.func_wrap(
1435        "should-cancel",
1436        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
1437            let host = caller.data_mut();
1438            Ok((host.should_cancel_host(),))
1439        },
1440    )?;
1441    inst.func_wrap(
1442        "yield-now",
1443        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
1444            let host = caller.data_mut();
1445            host.yield_now_host();
1446            Ok(())
1447        },
1448    )?;
1449    Ok(())
1450}
1451
1452fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
1453    add_component_control_instance(linker, "greentic:component/control@0.5.0")?;
1454    add_component_control_instance(linker, "greentic:component/control@0.4.0")?;
1455    Ok(())
1456}
1457
1458/// Reduced-authority linker for `identify-instance` and
1459/// `describe-identify-instance` probes (M1 IID Phase D).
1460///
1461/// Identity probes match an inbound webhook payload against known
1462/// per-endpoint discriminators (Telegram secret-token header, Slack
1463/// signing-secret, Teams JWT issuer, …) and return an endpoint id
1464/// or `none`. The WIT contract is a pure projection over `(headers,
1465/// body)` — no outbound HTTP, no persistent state, no secrets.
1466///
1467/// # Why this delegates to `register_all` (Wasmtime eager-import constraint)
1468///
1469/// Wasmtime's [`Linker::instantiate_pre`] type-checks the component's
1470/// **entire** import graph eagerly — not just the imports reachable from
1471/// the export the caller intends to invoke. A provider component that
1472/// exports `instance-identity-api` alongside `schema-core-api` typically
1473/// also imports `http-client`, `secrets-store`, etc. for the latter.
1474/// If the linker omits those imports, `instantiate_pre` fails with
1475/// `"a matching implementation was not found in the linker"` before the
1476/// identity export is even checked.
1477///
1478/// The ideal probe linker would register deny-shim handlers that satisfy
1479/// the import graph but trap on actual invocation. That requires
1480/// deny-shim support in `greentic-interfaces-wasmtime` (tracked as a
1481/// follow-up). Until then, probes use the same linker surface as normal
1482/// execution, with state-store disabled.
1483///
1484/// The reduced-authority boundary is enforced at the WASI policy layer
1485/// instead: probe call sites construct a locked-down
1486/// [`RunnerWasiPolicy`](crate::wasi::RunnerWasiPolicy) with no
1487/// preopens, no env passthrough, and no stdio inheritance. See
1488/// [`RunnerWasiPolicy::probe()`](crate::wasi::RunnerWasiPolicy::probe)
1489/// and the `probe_wasi_policy_is_locked_down` test.
1490pub fn register_identity_probe(linker: &mut Linker<ComponentState>) -> Result<()> {
1491    // Delegates to `register_all` with state-store disabled. See doc
1492    // comment above for the rationale (Wasmtime eager-import validation).
1493    register_all(linker, false)
1494}
1495
1496#[cfg(test)]
1497mod register_identity_probe_tests {
1498    use super::*;
1499
1500    /// Verify that [`register_identity_probe`] successfully links all
1501    /// imports needed by real provider components (wasi-core, wasi-tls,
1502    /// wasi-http, http-client, secrets-store, telemetry, etc.).
1503    ///
1504    /// Before this fix, the probe linker omitted most host imports.
1505    /// Wasmtime's `instantiate_pre` validates the **entire** import
1506    /// graph eagerly, so any provider with runtime imports (all real
1507    /// providers) would fail before the identity export was checked.
1508    #[test]
1509    fn register_identity_probe_links_successfully() {
1510        let engine = wasmtime::Engine::default();
1511        let mut linker = Linker::<ComponentState>::new(&engine);
1512        register_identity_probe(&mut linker).expect("probe linker registers all imports");
1513    }
1514
1515    /// Verify that the probe WASI policy has no preopens, no env, and
1516    /// no stdio — the only reduced-authority boundary available today.
1517    #[test]
1518    fn probe_wasi_policy_is_locked_down() {
1519        let policy = RunnerWasiPolicy::probe();
1520        assert!(!policy.inherit_stdio, "probe WASI must not inherit stdio");
1521        assert!(
1522            policy.preopens.is_empty(),
1523            "probe WASI must have no preopens"
1524        );
1525        assert!(
1526            policy.env_allow.is_empty(),
1527            "probe WASI must not allow env vars"
1528        );
1529        assert!(
1530            policy.env_set.is_empty(),
1531            "probe WASI must not set env vars"
1532        );
1533    }
1534}
1535
1536pub fn register_all(linker: &mut Linker<ComponentState>, allow_state_store: bool) -> Result<()> {
1537    add_wasi_to_linker(linker)?;
1538
1539    // Add wasi-tls types and turn on the feature in linker
1540    let mut opts = LinkOptions::default();
1541    opts.tls(true);
1542    wasmtime_wasi_tls::p2::add_to_linker(linker, &opts)?;
1543
1544    // Add wasi-http types and turn on the feature in linker
1545    add_wasi_http_to_linker(linker)?;
1546
1547    add_all_v1_to_linker(
1548        linker,
1549        HostFns {
1550            http_client_v1_1: Some(|state: &mut ComponentState| state.host_mut()),
1551            http_client: Some(|state: &mut ComponentState| state.host_mut()),
1552            oauth_broker: None,
1553            runner_host_http: Some(|state: &mut ComponentState| state.host_mut()),
1554            runner_host_kv: Some(|state: &mut ComponentState| state.host_mut()),
1555            telemetry_logger: Some(|state: &mut ComponentState| state.host_mut()),
1556            state_store: allow_state_store.then_some(|state: &mut ComponentState| state.host_mut()),
1557            secrets_store_v1_1: Some(|state: &mut ComponentState| state.host_mut()),
1558            secrets_store: None,
1559            runtime_config: Some(|state: &mut ComponentState| state.host_mut()),
1560        },
1561    )?;
1562    add_http_client_client_world_aliases(linker)?;
1563    Ok(())
1564}
1565
1566fn add_http_client_client_world_aliases(linker: &mut Linker<ComponentState>) -> Result<()> {
1567    let mut inst_v1_1 = linker.instance("greentic:http/client@1.1.0")?;
1568    inst_v1_1.func_wrap(
1569        "send",
1570        move |mut caller: StoreContextMut<'_, ComponentState>,
1571              (req, opts, ctx): (
1572            http_client_client_alias::Request,
1573            Option<http_client_client_alias::RequestOptions>,
1574            Option<http_client_client_alias::TenantCtx>,
1575        )| {
1576            let host = caller.data_mut().host_mut();
1577            let result = HttpClientHostV1_1::send(
1578                host,
1579                alias_request_to_host(req),
1580                opts.map(alias_request_options_to_host),
1581                ctx.map(alias_tenant_ctx_to_host),
1582            );
1583            Ok((match result {
1584                Ok(resp) => Ok(alias_response_from_host(resp)),
1585                Err(err) => Err(alias_error_from_host(err)),
1586            },))
1587        },
1588    )?;
1589    let mut inst_v1_0 = linker.instance("greentic:http/client@1.0.0")?;
1590    inst_v1_0.func_wrap(
1591        "send",
1592        move |mut caller: StoreContextMut<'_, ComponentState>,
1593              (req, ctx): (
1594            host_http_client::Request,
1595            Option<host_http_client::TenantCtx>,
1596        )| {
1597            let host = caller.data_mut().host_mut();
1598            let result = HttpClientHost::send(host, req, ctx);
1599            Ok((result,))
1600        },
1601    )?;
1602    Ok(())
1603}
1604
1605fn alias_request_to_host(req: http_client_client_alias::Request) -> host_http_client::RequestV1_1 {
1606    host_http_client::RequestV1_1 {
1607        method: req.method,
1608        url: req.url,
1609        headers: req.headers,
1610        body: req.body,
1611    }
1612}
1613
1614fn alias_request_options_to_host(
1615    opts: http_client_client_alias::RequestOptions,
1616) -> host_http_client::RequestOptionsV1_1 {
1617    host_http_client::RequestOptionsV1_1 {
1618        timeout_ms: opts.timeout_ms,
1619        allow_insecure: opts.allow_insecure,
1620        follow_redirects: opts.follow_redirects,
1621    }
1622}
1623
1624fn alias_tenant_ctx_to_host(
1625    ctx: http_client_client_alias::TenantCtx,
1626) -> host_http_client::TenantCtxV1_1 {
1627    host_http_client::TenantCtxV1_1 {
1628        env: ctx.env,
1629        tenant: ctx.tenant,
1630        tenant_id: ctx.tenant_id,
1631        team: ctx.team,
1632        team_id: ctx.team_id,
1633        user: ctx.user,
1634        user_id: ctx.user_id,
1635        trace_id: ctx.trace_id,
1636        correlation_id: ctx.correlation_id,
1637        i18n_id: ctx.i18n_id,
1638        attributes: ctx.attributes,
1639        session_id: ctx.session_id,
1640        flow_id: ctx.flow_id,
1641        node_id: ctx.node_id,
1642        provider_id: ctx.provider_id,
1643        deadline_ms: ctx.deadline_ms,
1644        attempt: ctx.attempt,
1645        idempotency_key: ctx.idempotency_key,
1646        impersonation: ctx.impersonation.map(|imp| http_types_v1_1::Impersonation {
1647            actor_id: imp.actor_id,
1648            reason: imp.reason,
1649        }),
1650    }
1651}
1652
1653fn alias_response_from_host(
1654    resp: host_http_client::ResponseV1_1,
1655) -> http_client_client_alias::Response {
1656    http_client_client_alias::Response {
1657        status: resp.status,
1658        headers: resp.headers,
1659        body: resp.body,
1660    }
1661}
1662
1663fn alias_error_from_host(
1664    err: host_http_client::HttpClientErrorV1_1,
1665) -> http_client_client_alias::HostError {
1666    http_client_client_alias::HostError {
1667        code: err.code,
1668        message: err.message,
1669    }
1670}
1671
1672impl OAuthHostContext for ComponentState {
1673    fn tenant_id(&self) -> &str {
1674        &self.host.config.tenant
1675    }
1676
1677    fn env(&self) -> &str {
1678        &self.host.default_env
1679    }
1680
1681    fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
1682        &mut self.host.oauth_host
1683    }
1684
1685    fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
1686        self.host.oauth_config.as_ref()
1687    }
1688}
1689
1690impl WasiView for ComponentState {
1691    fn ctx(&mut self) -> WasiCtxView<'_> {
1692        WasiCtxView {
1693            ctx: &mut self.wasi_ctx,
1694            table: &mut self.resource_table,
1695        }
1696    }
1697}
1698
1699impl WasiHttpView for ComponentState {
1700    fn http(&mut self) -> WasiHttpCtxView<'_> {
1701        WasiHttpCtxView {
1702            ctx: &mut self.wasi_http_ctx,
1703            table: &mut self.resource_table,
1704            hooks: Default::default(),
1705        }
1706    }
1707}
1708
1709impl WasiTlsView for ComponentState {
1710    fn tls(&mut self) -> WasiTlsCtxView<'_> {
1711        WasiTlsCtxView {
1712            ctx: &mut self.wasi_tls_ctx,
1713            table: &mut self.resource_table,
1714        }
1715    }
1716}
1717
1718#[allow(unsafe_code)]
1719unsafe impl Send for ComponentState {}
1720#[allow(unsafe_code)]
1721unsafe impl Sync for ComponentState {}
1722
1723impl PackRuntime {
1724    fn allows_state_store(&self, component_ref: &str) -> bool {
1725        if self.state_store.is_none() {
1726            return false;
1727        }
1728        if !self.config.state_store_policy.allow {
1729            return false;
1730        }
1731        let Some(manifest) = self.component_manifests.get(component_ref) else {
1732            // No manifest entry — allow state-store; Wasmtime rejects if not imported.
1733            return true;
1734        };
1735        // If manifest declares host.state capabilities, honour them.
1736        // If host.state is None (not declared in manifest), default to true so
1737        // components whose CBOR manifest omits the field still get state-store
1738        // linked — Wasmtime will reject at instantiation if not actually imported.
1739        manifest
1740            .capabilities
1741            .host
1742            .state
1743            .as_ref()
1744            .map(|caps| caps.read || caps.write)
1745            .unwrap_or(true)
1746    }
1747
1748    pub fn contains_component(&self, component_ref: &str) -> bool {
1749        self.components.contains_key(component_ref)
1750    }
1751
1752    /// Returns a clonable handle to the pack's state store, when one is
1753    /// configured. Used by the flow engine's built-in `state.get`/`state.set`
1754    /// operators which call into the same store that WASM components read
1755    /// through their `state.read`/`state.write` host imports.
1756    pub fn state_store_handle(&self) -> Option<crate::storage::DynStateStore> {
1757        self.state_store.clone()
1758    }
1759
1760    #[allow(clippy::too_many_arguments)]
1761    pub async fn load(
1762        path: impl AsRef<Path>,
1763        config: Arc<HostConfig>,
1764        mocks: Option<Arc<MockLayer>>,
1765        archive_source: Option<&Path>,
1766        session_store: Option<DynSessionStore>,
1767        state_store: Option<DynStateStore>,
1768        wasi_policy: Arc<RunnerWasiPolicy>,
1769        secrets: DynSecretsManager,
1770        oauth_config: Option<OAuthBrokerConfig>,
1771        verify_archive: bool,
1772        component_resolution: ComponentResolution,
1773    ) -> Result<Self> {
1774        let path = path.as_ref();
1775        let (_pack_root, safe_path) = normalize_pack_path(path)?;
1776        let path_meta = std::fs::metadata(&safe_path).ok();
1777        let is_dir = path_meta
1778            .as_ref()
1779            .map(|meta| meta.is_dir())
1780            .unwrap_or(false);
1781        let is_component = !is_dir
1782            && safe_path
1783                .extension()
1784                .and_then(|ext| ext.to_str())
1785                .map(|ext| ext.eq_ignore_ascii_case("wasm"))
1786                .unwrap_or(false);
1787        let archive_hint_path = if let Some(source) = archive_source {
1788            let (_, normalized) = normalize_pack_path(source)?;
1789            Some(normalized)
1790        } else if is_component || is_dir {
1791            None
1792        } else {
1793            Some(safe_path.clone())
1794        };
1795        let archive_hint = archive_hint_path.as_deref();
1796        if verify_archive {
1797            if let Some(verify_target) = archive_hint.and_then(|p| {
1798                std::fs::metadata(p)
1799                    .ok()
1800                    .filter(|meta| meta.is_file())
1801                    .map(|_| p)
1802            }) {
1803                verify::verify_pack(verify_target).await?;
1804                tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
1805            } else {
1806                tracing::debug!("skipping archive verification (no archive source)");
1807            }
1808        }
1809        let engine = Engine::default();
1810        let engine_profile =
1811            EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1812        let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1813        let mut metadata = PackMetadata::fallback(&safe_path);
1814        let mut manifest = None;
1815        let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
1816        let mut flows = None;
1817        let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
1818            if is_dir {
1819                Some(safe_path.clone())
1820            } else {
1821                None
1822            }
1823        });
1824        let (pack_assets_dir, assets_tempdir) =
1825            locate_pack_assets(materialized_root.as_deref(), archive_hint)?;
1826        let setup_yaml_exists = pack_assets_dir
1827            .as_ref()
1828            .map(|dir| dir.join("setup.yaml").is_file())
1829            .unwrap_or(false);
1830        tracing::info!(
1831            pack_root = %safe_path.display(),
1832            assets_setup_yaml_exists = setup_yaml_exists,
1833            "pack unpack metadata"
1834        );
1835
1836        if let Some(root) = materialized_root.as_ref() {
1837            match load_manifest_and_flows_from_dir(root) {
1838                Ok(ManifestLoad::New {
1839                    manifest: m,
1840                    flows: cache,
1841                }) => {
1842                    metadata = cache.metadata.clone();
1843                    manifest = Some(*m);
1844                    flows = Some(cache);
1845                }
1846                Ok(ManifestLoad::Legacy {
1847                    manifest: m,
1848                    flows: cache,
1849                }) => {
1850                    metadata = cache.metadata.clone();
1851                    legacy_manifest = Some(m);
1852                    flows = Some(cache);
1853                }
1854                Err(err) => {
1855                    warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
1856                }
1857            }
1858        }
1859
1860        if manifest.is_none()
1861            && legacy_manifest.is_none()
1862            && let Some(archive_path) = archive_hint
1863        {
1864            let manifest_load = load_manifest_and_flows(archive_path).with_context(|| {
1865                format!(
1866                    "failed to load manifest.cbor from {}",
1867                    archive_path.display()
1868                )
1869            })?;
1870            match manifest_load {
1871                ManifestLoad::New {
1872                    manifest: m,
1873                    flows: cache,
1874                } => {
1875                    metadata = cache.metadata.clone();
1876                    manifest = Some(*m);
1877                    flows = Some(cache);
1878                }
1879                ManifestLoad::Legacy {
1880                    manifest: m,
1881                    flows: cache,
1882                } => {
1883                    metadata = cache.metadata.clone();
1884                    legacy_manifest = Some(m);
1885                    flows = Some(cache);
1886                }
1887            }
1888        }
1889        #[cfg(feature = "fault-injection")]
1890        {
1891            let fault_ctx = FaultContext {
1892                pack_id: metadata.pack_id.as_str(),
1893                flow_id: "unknown",
1894                node_id: None,
1895                attempt: 1,
1896            };
1897            maybe_fail(FaultPoint::PackResolve, fault_ctx)
1898                .map_err(|err| anyhow!(err.to_string()))?;
1899        }
1900        let mut pack_lock = None;
1901        for root in find_pack_lock_roots(&safe_path, is_dir, archive_hint) {
1902            pack_lock = load_pack_lock(&root)?;
1903            if pack_lock.is_some() {
1904                break;
1905            }
1906        }
1907        let component_sources_payload = if pack_lock.is_none() {
1908            if let Some(manifest) = manifest.as_ref() {
1909                manifest
1910                    .get_component_sources_v1()
1911                    .context("invalid component sources extension")?
1912            } else {
1913                None
1914            }
1915        } else {
1916            None
1917        };
1918        let component_sources = if let Some(lock) = pack_lock.as_ref() {
1919            Some(component_sources_table_from_pack_lock(
1920                lock,
1921                component_resolution.allow_missing_hash,
1922            )?)
1923        } else {
1924            component_sources_table(component_sources_payload.as_ref())?
1925        };
1926        let components = if is_component {
1927            let wasm_bytes = fs::read(&safe_path).await?;
1928            metadata = PackMetadata::from_wasm(&wasm_bytes)
1929                .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1930            let name = safe_path
1931                .file_stem()
1932                .map(|s| s.to_string_lossy().to_string())
1933                .unwrap_or_else(|| "component".to_string());
1934            let component = compile_component_with_cache(&cache, &engine, None, wasm_bytes).await?;
1935            let mut map = HashMap::new();
1936            map.insert(
1937                name.clone(),
1938                PackComponent {
1939                    name,
1940                    version: metadata.version.clone(),
1941                    component,
1942                },
1943            );
1944            map
1945        } else {
1946            let specs = component_specs(
1947                manifest.as_ref(),
1948                legacy_manifest.as_deref(),
1949                component_sources_payload.as_ref(),
1950                pack_lock.as_ref(),
1951            );
1952            if specs.is_empty() {
1953                HashMap::new()
1954            } else {
1955                let mut loaded = HashMap::new();
1956                let mut missing: HashSet<String> =
1957                    specs.iter().map(|spec| spec.id.clone()).collect();
1958                let mut searched = Vec::new();
1959
1960                if !component_resolution.overrides.is_empty() {
1961                    load_components_from_overrides(
1962                        &cache,
1963                        &engine,
1964                        &component_resolution.overrides,
1965                        &specs,
1966                        &mut missing,
1967                        &mut loaded,
1968                    )
1969                    .await?;
1970                    searched.push("override map".to_string());
1971                }
1972
1973                if let Some(component_sources) = component_sources.as_ref() {
1974                    load_components_from_sources(
1975                        &cache,
1976                        &engine,
1977                        component_sources,
1978                        &component_resolution,
1979                        &specs,
1980                        &mut missing,
1981                        &mut loaded,
1982                        materialized_root.as_deref(),
1983                        archive_hint,
1984                    )
1985                    .await?;
1986                    searched.push(format!("extension {}", EXT_COMPONENT_SOURCES_V1));
1987                }
1988
1989                if let Some(root) = materialized_root.as_ref() {
1990                    load_components_from_dir(
1991                        &cache,
1992                        &engine,
1993                        root,
1994                        &specs,
1995                        &mut missing,
1996                        &mut loaded,
1997                    )
1998                    .await?;
1999                    searched.push(format!("components dir {}", root.display()));
2000                }
2001
2002                if let Some(archive_path) = archive_hint {
2003                    load_components_from_archive(
2004                        &cache,
2005                        &engine,
2006                        archive_path,
2007                        &specs,
2008                        &mut missing,
2009                        &mut loaded,
2010                    )
2011                    .await?;
2012                    searched.push(format!("archive {}", archive_path.display()));
2013                }
2014
2015                if !missing.is_empty() {
2016                    let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
2017                    let sources = if searched.is_empty() {
2018                        "no component sources".to_string()
2019                    } else {
2020                        searched.join(", ")
2021                    };
2022                    bail!(
2023                        "components missing: {}; looked in {}",
2024                        missing_list,
2025                        sources
2026                    );
2027                }
2028
2029                loaded
2030            }
2031        };
2032        let http_client = Arc::clone(&HTTP_CLIENT);
2033        let mut component_manifests = HashMap::new();
2034        if let Some(manifest) = manifest.as_ref() {
2035            for component in &manifest.components {
2036                component_manifests.insert(component.id.as_str().to_string(), component.clone());
2037            }
2038        }
2039        let mut pack_policy = (*wasi_policy).clone();
2040        if let Some(dir) = pack_assets_dir {
2041            tracing::debug!(path = %dir.display(), "preopening pack assets directory for WASI /assets");
2042            pack_policy =
2043                pack_policy.with_preopen(PreopenSpec::new(dir, "/assets").read_only(true));
2044        }
2045        let wasi_policy = Arc::new(pack_policy);
2046        Ok(Self {
2047            path: safe_path,
2048            archive_path: archive_hint.map(Path::to_path_buf),
2049            config,
2050            engine,
2051            metadata,
2052            manifest,
2053            legacy_manifest,
2054            component_manifests,
2055            mocks,
2056            flows,
2057            components,
2058            http_client,
2059            pre_cache: Mutex::new(HashMap::new()),
2060            session_store,
2061            state_store,
2062            wasi_policy,
2063            assets_tempdir,
2064            provider_registry: RwLock::new(None),
2065            identify_hint_cache: RwLock::new(HashMap::new()),
2066            secrets,
2067            oauth_config,
2068            cache,
2069            runtime_config_non_secret: None,
2070            runtime_refs: None,
2071        })
2072    }
2073
2074    /// Inject the `pack-config.v1.non_secret` map for this pack. Called by
2075    /// the producer (greentic-start, C4.3) after loading the deployed
2076    /// `PackConfig`. Passing `None` clears any previously-set map.
2077    pub fn set_runtime_config_non_secret(&mut self, map: Option<Arc<BTreeMap<String, Value>>>) {
2078        self.runtime_config_non_secret = map;
2079    }
2080
2081    /// Read-only accessor for the injected `pack-config.v1.non_secret` map.
2082    /// Used by the revision loader's tests to assert producer plumbing.
2083    pub fn runtime_config_non_secret(&self) -> Option<&Arc<BTreeMap<String, Value>>> {
2084        self.runtime_config_non_secret.as_ref()
2085    }
2086
2087    /// Inject the `pack-config.v1.runtime_refs` channel (C5): per-pack
2088    /// `key → URI` bindings plus the env-shared resolver. Called by
2089    /// greentic-start after loading the deployed `PackConfig`. Passing
2090    /// `None` clears any previously-set injection.
2091    pub fn set_runtime_refs(&mut self, injection: Option<RuntimeRefsInjection>) {
2092        self.runtime_refs = injection;
2093    }
2094
2095    /// Read-only accessor for the injected runtime-refs channel. Used by
2096    /// the revision loader's tests to assert producer plumbing.
2097    pub fn runtime_refs(&self) -> Option<&RuntimeRefsInjection> {
2098        self.runtime_refs.as_ref()
2099    }
2100
2101    pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
2102        if let Some(cache) = &self.flows {
2103            return Ok(cache.descriptors.clone());
2104        }
2105        if let Some(manifest) = &self.manifest {
2106            let descriptors = manifest
2107                .flows
2108                .iter()
2109                .map(|flow| FlowDescriptor {
2110                    id: flow.id.as_str().to_string(),
2111                    flow_type: flow_kind_to_str(flow.kind).to_string(),
2112                    pack_id: manifest.pack_id.as_str().to_string(),
2113                    profile: manifest.pack_id.as_str().to_string(),
2114                    version: manifest.version.to_string(),
2115                    description: None,
2116                })
2117                .collect();
2118            return Ok(descriptors);
2119        }
2120        Ok(Vec::new())
2121    }
2122
2123    #[allow(dead_code)]
2124    pub async fn run_flow(
2125        &self,
2126        flow_id: &str,
2127        input: serde_json::Value,
2128    ) -> Result<serde_json::Value> {
2129        let pack = Arc::new(
2130            PackRuntime::load(
2131                &self.path,
2132                Arc::clone(&self.config),
2133                self.mocks.clone(),
2134                self.archive_path.as_deref(),
2135                self.session_store.clone(),
2136                self.state_store.clone(),
2137                Arc::clone(&self.wasi_policy),
2138                self.secrets.clone(),
2139                self.oauth_config.clone(),
2140                false,
2141                ComponentResolution::default(),
2142            )
2143            .await?,
2144        );
2145
2146        let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
2147        let retry_config = self.config.retry_config().into();
2148        let mocks = pack.mocks.as_deref();
2149        let tenant = self.config.tenant.as_str();
2150
2151        let ctx = FlowContext {
2152            tenant,
2153            pack_id: pack.metadata().pack_id.as_str(),
2154            flow_id,
2155            node_id: None,
2156            tool: None,
2157            action: None,
2158            session_id: None,
2159            provider_id: None,
2160            retry_config,
2161            attempt: 1,
2162            observer: None,
2163            mocks,
2164        };
2165
2166        let execution = engine.execute(ctx, input).await?;
2167        match execution.status {
2168            FlowStatus::Completed => Ok(execution.output),
2169            FlowStatus::Waiting(wait) => Ok(serde_json::json!({
2170                "status": "pending",
2171                "reason": wait.reason,
2172                "resume": wait.snapshot,
2173                "response": execution.output,
2174            })),
2175        }
2176    }
2177
2178    pub async fn invoke_component(
2179        &self,
2180        component_ref: &str,
2181        ctx: ComponentExecCtx,
2182        operation: &str,
2183        config_json: Option<String>,
2184        input_json: String,
2185    ) -> Result<Value> {
2186        let pack_component = self
2187            .components
2188            .get(component_ref)
2189            .with_context(|| format!("component '{component_ref}' not found in pack"))?;
2190        let engine = self.engine.clone();
2191        let config = Arc::clone(&self.config);
2192        let http_client = Arc::clone(&self.http_client);
2193        let mocks = self.mocks.clone();
2194        let session_store = self.session_store.clone();
2195        let state_store = self.state_store.clone();
2196        let secrets = Arc::clone(&self.secrets);
2197        let oauth_config = self.oauth_config.clone();
2198        let wasi_policy = Arc::clone(&self.wasi_policy);
2199        let pack_id = self.metadata().pack_id.clone();
2200        let allow_state_store = self.allows_state_store(component_ref);
2201        let component = pack_component.component.clone();
2202        let component_ref_owned = component_ref.to_string();
2203        let operation_owned = operation.to_string();
2204        let input_owned =
2205            Self::merge_component_config_into_input_json(config_json.as_deref(), &input_json)
2206                .context("merge component config into invocation payload")?;
2207        let ctx_owned = ctx;
2208        let runtime_config_non_secret = self.runtime_config_non_secret.clone();
2209        let runtime_refs = self.runtime_refs.clone();
2210
2211        run_on_wasi_thread("component.invoke", move || {
2212            let mut linker = Linker::new(&engine);
2213            register_all(&mut linker, allow_state_store)?;
2214            add_component_control_to_linker(&mut linker)?;
2215
2216            let host_state = HostState::new(
2217                pack_id.clone(),
2218                config,
2219                http_client,
2220                mocks,
2221                session_store,
2222                state_store,
2223                secrets,
2224                oauth_config,
2225                Some(ctx_owned.clone()),
2226                Some(component_ref_owned.clone()),
2227                false,
2228                runtime_config_non_secret,
2229                runtime_refs,
2230            )?;
2231            let store_state = ComponentState::new(host_state, wasi_policy)?;
2232            let mut store = wasmtime::Store::new(&engine, store_state);
2233
2234            let invoke_result = HostState::instantiate_component_result(
2235                &mut linker,
2236                &mut store,
2237                &component,
2238                &ctx_owned,
2239                &component_ref_owned,
2240                &operation_owned,
2241                &input_owned,
2242            )?;
2243            HostState::convert_invoke_result(invoke_result)
2244        })
2245    }
2246
2247    fn merge_component_config_into_input_json(
2248        config_json: Option<&str>,
2249        input_json: &str,
2250    ) -> Result<String> {
2251        let Some(config_json) = config_json else {
2252            return Ok(input_json.to_string());
2253        };
2254
2255        let config_value: Value =
2256            serde_json::from_str(config_json).context("parse component config JSON")?;
2257
2258        if let Ok(mut invocation) =
2259            serde_json::from_str::<greentic_types::InvocationEnvelope>(input_json)
2260        {
2261            let payload_value = serde_json::from_slice(&invocation.payload).unwrap_or_else(|_| {
2262                Value::String(String::from_utf8_lossy(&invocation.payload).into_owned())
2263            });
2264            invocation.payload = serde_json::to_vec(&serde_json::json!({
2265                "config": config_value,
2266                "input": payload_value,
2267            }))
2268            .context("serialize merged invocation payload")?;
2269            return serde_json::to_string(&invocation)
2270                .context("serialize merged invocation envelope");
2271        }
2272
2273        let input_value = serde_json::from_str(input_json)
2274            .unwrap_or_else(|_| Value::String(input_json.to_string()));
2275        serde_json::to_string(&serde_json::json!({
2276            "config": config_value,
2277            "input": input_value,
2278        }))
2279        .context("serialize merged component input")
2280    }
2281
2282    pub fn resolve_provider(
2283        &self,
2284        provider_id: Option<&str>,
2285        provider_type: Option<&str>,
2286    ) -> Result<ProviderBinding> {
2287        let registry = self.provider_registry()?;
2288        registry.resolve(provider_id, provider_type)
2289    }
2290
2291    pub async fn invoke_provider(
2292        &self,
2293        binding: &ProviderBinding,
2294        ctx: ComponentExecCtx,
2295        op: &str,
2296        input_json: Vec<u8>,
2297    ) -> Result<Value> {
2298        let component_ref_owned = binding.component_ref.clone();
2299        let pack_component = self.components.get(&component_ref_owned).with_context(|| {
2300            format!("provider component '{component_ref_owned}' not found in pack")
2301        })?;
2302        let component = pack_component.component.clone();
2303
2304        let engine = self.engine.clone();
2305        let config = Arc::clone(&self.config);
2306        let http_client = Arc::clone(&self.http_client);
2307        let mocks = self.mocks.clone();
2308        let session_store = self.session_store.clone();
2309        let state_store = self.state_store.clone();
2310        let secrets = Arc::clone(&self.secrets);
2311        let oauth_config = self.oauth_config.clone();
2312        let wasi_policy = Arc::clone(&self.wasi_policy);
2313        let pack_id = self.metadata().pack_id.clone();
2314        let allow_state_store = self.allows_state_store(&component_ref_owned);
2315        let input_owned = input_json;
2316        let op_owned = op.to_string();
2317        let ctx_owned = ctx;
2318        let world = binding.world.clone();
2319        let runtime_config_non_secret = self.runtime_config_non_secret.clone();
2320        let runtime_refs = self.runtime_refs.clone();
2321
2322        run_on_wasi_thread("provider.invoke", move || {
2323            let mut linker = Linker::new(&engine);
2324            register_all(&mut linker, allow_state_store)?;
2325            add_component_control_to_linker(&mut linker)?;
2326            let host_state = HostState::new(
2327                pack_id.clone(),
2328                config,
2329                http_client,
2330                mocks,
2331                session_store,
2332                state_store,
2333                secrets,
2334                oauth_config,
2335                Some(ctx_owned.clone()),
2336                Some(component_ref_owned.clone()),
2337                true,
2338                runtime_config_non_secret,
2339                runtime_refs,
2340            )?;
2341            let store_state = ComponentState::new(host_state, wasi_policy)?;
2342            let mut store = wasmtime::Store::new(&engine, store_state);
2343            let use_schema_core_schema = world.contains("provider-schema-core");
2344            let use_schema_core_path = world.contains("provider/schema-core");
2345            let result = if use_schema_core_schema {
2346                let pre_instance = linker.instantiate_pre(component.as_ref())?;
2347                let pre: SchemaSchemaCorePre<ComponentState> =
2348                    SchemaSchemaCorePre::new(pre_instance)?;
2349                let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
2350                let provider = bindings.greentic_provider_schema_core_schema_core_api();
2351                provider.call_invoke(&mut store, &op_owned, &input_owned)?
2352            } else if use_schema_core_path {
2353                let pre_instance = linker.instantiate_pre(component.as_ref())?;
2354                let path_attempt = (|| -> Result<Vec<u8>> {
2355                    let pre: PathSchemaCorePre<ComponentState> =
2356                        PathSchemaCorePre::new(pre_instance)?;
2357                    let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
2358                    let provider = bindings.greentic_provider_schema_core_api();
2359                    Ok(provider.call_invoke(&mut store, &op_owned, &input_owned)?)
2360                })();
2361                match path_attempt {
2362                    Ok(value) => value,
2363                    Err(path_err)
2364                        if path_err.to_string().contains("no exported instance named") =>
2365                    {
2366                        let pre_instance = linker.instantiate_pre(component.as_ref())?;
2367                        let pre: SchemaSchemaCorePre<ComponentState> =
2368                            SchemaSchemaCorePre::new(pre_instance)?;
2369                        let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
2370                        let provider = bindings.greentic_provider_schema_core_schema_core_api();
2371                        provider.call_invoke(&mut store, &op_owned, &input_owned)?
2372                    }
2373                    Err(path_err) => return Err(path_err),
2374                }
2375            } else {
2376                let pre_instance = linker.instantiate_pre(component.as_ref())?;
2377                let pre: LegacySchemaCorePre<ComponentState> =
2378                    LegacySchemaCorePre::new(pre_instance)?;
2379                let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
2380                let provider = bindings.greentic_provider_core_schema_core_api();
2381                provider.call_invoke(&mut store, &op_owned, &input_owned)?
2382            };
2383            deserialize_json_bytes(result)
2384        })
2385    }
2386
2387    /// Call the provider component's `identify-instance` export
2388    /// (`greentic:provider-instance-identity@0.1.0`) with the inbound
2389    /// payload bytes. Returns an [`IdentifyOutcome`] — see the variant
2390    /// docs for the per-case contract.
2391    ///
2392    /// # Payload shape (M1 IID.4d wrapper)
2393    ///
2394    /// `payload` is forwarded opaque to the component. The shape is set by
2395    /// the caller; the M1 IID.4d wrapper convention from `greentic-start`
2396    /// is `{headers: [{name,value}], body: <parsed-or-null>}` so providers
2397    /// whose discriminator lives in HTTP headers (Telegram via
2398    /// `x-telegram-bot-api-secret-token`) can identify the instance the
2399    /// same call shape that body-based providers (Teams, Slack, Webex,
2400    /// etc.) use. See the docstring on
2401    /// `greentic:provider-instance-identity/instance-identity-api.identify-instance`
2402    /// for the full contract; this host method does not parse or
2403    /// validate the bytes.
2404    ///
2405    /// # Host authority on identity probes
2406    ///
2407    /// The linker registers the full host import surface (Wasmtime
2408    /// validates all imports eagerly at `instantiate_pre`, not just
2409    /// those reachable from the invoked export). The WASI sandbox is
2410    /// locked down: no preopens, no env, no stdio. Deny-shim linker
2411    /// handlers (trap on call, satisfy at link time) are a follow-up
2412    /// in `greentic-interfaces-wasmtime`. See [`register_identity_probe`].
2413    pub async fn invoke_identify_instance(
2414        &self,
2415        binding: &ProviderBinding,
2416        payload: Vec<u8>,
2417    ) -> Result<IdentifyOutcome> {
2418        let component_ref_owned = binding.component_ref.clone();
2419        let pack_component = self.components.get(&component_ref_owned).with_context(|| {
2420            format!("provider component '{component_ref_owned}' not found in pack")
2421        })?;
2422        let component = pack_component.component.clone();
2423
2424        let engine = self.engine.clone();
2425        let config = Arc::clone(&self.config);
2426        let http_client = Arc::clone(&self.http_client);
2427        let mocks = self.mocks.clone();
2428        let session_store = self.session_store.clone();
2429        let state_store = self.state_store.clone();
2430        let secrets = Arc::clone(&self.secrets);
2431        let oauth_config = self.oauth_config.clone();
2432        let pack_id = self.metadata().pack_id.clone();
2433
2434        // Locked-down WASI policy: no preopens, no env, no stdio.
2435        // The linker registers all imports (Wasmtime requires it for
2436        // instantiate_pre), but the WASI sandbox is the tightest we
2437        // can enforce today. See [`register_identity_probe`] docs.
2438        let wasi_policy = Arc::new(RunnerWasiPolicy::probe());
2439        let runtime_config_non_secret = self.runtime_config_non_secret.clone();
2440        let runtime_refs = self.runtime_refs.clone();
2441        run_on_wasi_thread("provider.identify_instance", move || {
2442            let mut linker = Linker::new(&engine);
2443            register_identity_probe(&mut linker)?;
2444            let host_state = HostState::new(
2445                pack_id.clone(),
2446                config,
2447                http_client,
2448                mocks,
2449                session_store,
2450                state_store,
2451                secrets,
2452                oauth_config,
2453                None,
2454                Some(component_ref_owned.clone()),
2455                true,
2456                runtime_config_non_secret,
2457                runtime_refs,
2458            )?;
2459            let store_state = ComponentState::new(host_state, wasi_policy)?;
2460            let mut store = wasmtime::Store::new(&engine, store_state);
2461
2462            let pre_instance = linker.instantiate_pre(component.as_ref())?;
2463            let pre = match InstanceIdentityPre::<ComponentState>::new(pre_instance) {
2464                Ok(pre) => pre,
2465                Err(err) if is_missing_export_error(&format!("{err:#}")) => {
2466                    return Ok(IdentifyOutcome::Unsupported);
2467                }
2468                Err(err) => return Err(err.into()),
2469            };
2470            let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
2471            let api = bindings.greentic_provider_instance_identity_instance_identity_api();
2472            let result = api.call_identify_instance(&mut store, &payload)?;
2473            Ok(match result {
2474                Some(id) => IdentifyOutcome::Identified(id),
2475                None => IdentifyOutcome::NoMatch,
2476            })
2477        })
2478    }
2479
2480    /// Call the provider component's `describe-identify-instance` export
2481    /// (`greentic:provider-instance-identity/instance-identity-describe@0.1.0`)
2482    /// and parse the returned JSON into an [`IdentifyInstanceHint`].
2483    ///
2484    /// Returns `Ok(None)` for every "no hint available" case: the
2485    /// component does not export the describe world, the export returned
2486    /// `none`, the returned bytes are not valid JSON, or the `version`
2487    /// gate failed. The two malformed cases are warn-logged so a typo'd
2488    /// hint surfaces in operator logs without blocking ingest. Component
2489    /// traps and other infrastructure errors propagate as `Err`.
2490    ///
2491    /// This is the uncached probe — see [`resolve_identify_hint`] for the
2492    /// cached wrapper that callers SHOULD use on the inbound hot path.
2493    ///
2494    /// [`resolve_identify_hint`]: PackRuntime::resolve_identify_hint
2495    pub async fn invoke_describe_identify_instance(
2496        &self,
2497        binding: &ProviderBinding,
2498    ) -> Result<Option<IdentifyInstanceHint>> {
2499        let component_ref_owned = binding.component_ref.clone();
2500        let pack_component = self.components.get(&component_ref_owned).with_context(|| {
2501            format!("provider component '{component_ref_owned}' not found in pack")
2502        })?;
2503        let component = pack_component.component.clone();
2504
2505        let engine = self.engine.clone();
2506        let config = Arc::clone(&self.config);
2507        let http_client = Arc::clone(&self.http_client);
2508        let mocks = self.mocks.clone();
2509        let session_store = self.session_store.clone();
2510        let state_store = self.state_store.clone();
2511        let secrets = Arc::clone(&self.secrets);
2512        let oauth_config = self.oauth_config.clone();
2513        let pack_id = self.metadata().pack_id.clone();
2514
2515        // Locked-down WASI policy — same rationale as
2516        // `invoke_identify_instance`. See [`register_identity_probe`] docs.
2517        let wasi_policy = Arc::new(RunnerWasiPolicy::probe());
2518        let runtime_config_non_secret = self.runtime_config_non_secret.clone();
2519        let runtime_refs = self.runtime_refs.clone();
2520        run_on_wasi_thread("provider.describe_identify_instance", move || {
2521            let mut linker = Linker::new(&engine);
2522            register_identity_probe(&mut linker)?;
2523            let host_state = HostState::new(
2524                pack_id.clone(),
2525                config,
2526                http_client,
2527                mocks,
2528                session_store,
2529                state_store,
2530                secrets,
2531                oauth_config,
2532                None,
2533                Some(component_ref_owned.clone()),
2534                true,
2535                runtime_config_non_secret,
2536                runtime_refs,
2537            )?;
2538            let store_state = ComponentState::new(host_state, wasi_policy)?;
2539            let mut store = wasmtime::Store::new(&engine, store_state);
2540
2541            let pre_instance = linker.instantiate_pre(component.as_ref())?;
2542            let pre = match InstanceIdentityDescribePre::<ComponentState>::new(pre_instance) {
2543                Ok(pre) => pre,
2544                Err(err) if is_missing_export_error(&format!("{err:#}")) => {
2545                    return Ok(None);
2546                }
2547                Err(err) => return Err(err.into()),
2548            };
2549            let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
2550            let api = bindings.greentic_provider_instance_identity_instance_identity_describe_api();
2551            let raw = api.call_describe_identify_instance(&mut store)?;
2552            let Some(bytes) = raw else {
2553                // Component exported the world but said "no hint right now".
2554                // Per the WIT contract this is equivalent to a missing
2555                // export — unhinted fallback at the caller.
2556                return Ok(None);
2557            };
2558            match IdentifyInstanceHint::from_json(&bytes) {
2559                Ok(hint) => Ok(Some(hint)),
2560                Err(err) => {
2561                    // Malformed hint or wrong version. Don't fail closed:
2562                    // the contract demands the host fall back to unhinted
2563                    // (invoke identify-instance with the global allowlist).
2564                    // Warn so the provider author can fix the hint.
2565                    warn!(
2566                        event = "provider.describe_identify_instance.malformed",
2567                        component_ref = %component_ref_owned,
2568                        error = %err,
2569                        "ignoring malformed describe-identify-instance hint; \
2570                         falling back to unhinted wrapper"
2571                    );
2572                    Ok(None)
2573                }
2574            }
2575        })
2576    }
2577
2578    /// Cached wrapper around [`invoke_describe_identify_instance`]. The
2579    /// hint for a given `binding.component_ref` is invariant across
2580    /// inbound requests within a revision (it is a function of the
2581    /// component itself, not of the payload), so we probe lazily on
2582    /// first ask and reuse thereafter. `ArcSwap`-driven revision swaps
2583    /// allocate a fresh [`PackRuntime`], naturally invalidating the cache.
2584    ///
2585    /// Returns `None` when the component does not export the describe
2586    /// world, when the probe returns no hint, or when the probe fails
2587    /// (trap, timeout, instantiation error). Failures are warn-logged
2588    /// and cached — the same trap is logged once per revision per
2589    /// component, not per request.
2590    ///
2591    /// [`invoke_describe_identify_instance`]:
2592    ///     PackRuntime::invoke_describe_identify_instance
2593    pub async fn resolve_identify_hint(
2594        &self,
2595        binding: &ProviderBinding,
2596    ) -> Option<IdentifyInstanceHint> {
2597        if let Some(cached) = self.identify_hint_cache.read().get(&binding.component_ref) {
2598            return cached.clone();
2599        }
2600        let hint = match self.invoke_describe_identify_instance(binding).await {
2601            Ok(hint) => hint,
2602            Err(err) => {
2603                warn!(
2604                    event = "provider.describe_identify_instance.failed",
2605                    component_ref = %binding.component_ref,
2606                    error = %err,
2607                    "describe-identify-instance probe failed; \
2608                     falling back to unhinted wrapper for this component"
2609                );
2610                None
2611            }
2612        };
2613        // Tolerate a concurrent populate — `insert` is idempotent on the
2614        // same (component_ref, hint) shape and the probe is pure w.r.t.
2615        // the component, so re-probing on a write-race yields identical
2616        // bytes.
2617        self.identify_hint_cache
2618            .write()
2619            .insert(binding.component_ref.clone(), hint.clone());
2620        hint
2621    }
2622
2623    /// Fan out [`resolve_identify_hint`] over each requested `provider_type`.
2624    /// Result map is keyed by `provider_type`; `None` value means the
2625    /// pack has no binding for that type OR the binding's component does
2626    /// not export the describe world (unhinted — caller forwards input
2627    /// headers unfiltered for back-compat).
2628    ///
2629    /// `provider_id`-collision errors from [`ProviderRegistry::resolve`]
2630    /// against a `provider_type` query are propagated (M1.1 invariant
2631    /// violation, malformed pack).
2632    ///
2633    /// Fan out [`resolve_identify_hint`] across requested types. `None` value
2634    /// means the pack has no binding for that type OR the binding's component
2635    /// does not export the describe world.
2636    ///
2637    /// The per-binding loop is inlined (rather than factored into a shared
2638    /// `AsyncFnMut`-based helper) deliberately: routing through an
2639    /// `AsyncFnMut` closure destabilises HRTB `Send` inference for the
2640    /// returned future, which propagates up to host-level fan-out APIs and
2641    /// from there to downstream spawned-service consumers. See the
2642    /// regression test `identify_futures_are_send` on the host.
2643    ///
2644    /// [`resolve_identify_hint`]: PackRuntime::resolve_identify_hint
2645    pub async fn describe_identify_hints_by_provider_type(
2646        &self,
2647        provider_types: &[&str],
2648    ) -> Result<HashMap<String, Option<IdentifyInstanceHint>>> {
2649        let mut out = HashMap::with_capacity(provider_types.len());
2650        let registry = match self.provider_registry_optional()? {
2651            Some(registry) => registry,
2652            None => {
2653                for ty in provider_types {
2654                    out.insert((*ty).to_string(), None);
2655                }
2656                return Ok(out);
2657            }
2658        };
2659        for ty in provider_types {
2660            let Some(binding) = registry.try_resolve(None, Some(ty))? else {
2661                out.insert((*ty).to_string(), None);
2662                continue;
2663            };
2664            let hint = self.resolve_identify_hint(&binding).await;
2665            out.insert((*ty).to_string(), hint);
2666        }
2667        Ok(out)
2668    }
2669
2670    /// Unscoped legacy API: fan out [`invoke_identify_instance`] with the
2671    /// caller-supplied opaque `payload` bytes forwarded verbatim. No
2672    /// describe-identify-instance hint lookup, no per-provider header
2673    /// scoping. New callers should use the `_scoped` sibling for
2674    /// per-provider header allowlist scoping (Phase D).
2675    ///
2676    /// Loop inlined for the same reason as
2677    /// [`describe_identify_hints_by_provider_type`].
2678    ///
2679    /// [`invoke_identify_instance`]: PackRuntime::invoke_identify_instance
2680    /// [`describe_identify_hints_by_provider_type`]:
2681    ///     PackRuntime::describe_identify_hints_by_provider_type
2682    pub async fn identify_endpoints_by_provider_type(
2683        &self,
2684        provider_types: &[&str],
2685        payload: &[u8],
2686    ) -> Result<HashMap<String, IdentifyOutcome>> {
2687        let mut out = HashMap::with_capacity(provider_types.len());
2688        let registry = match self.provider_registry_optional()? {
2689            Some(registry) => registry,
2690            None => {
2691                for ty in provider_types {
2692                    out.insert((*ty).to_string(), IdentifyOutcome::Unsupported);
2693                }
2694                return Ok(out);
2695            }
2696        };
2697        for ty in provider_types {
2698            let Some(binding) = registry.try_resolve(None, Some(ty))? else {
2699                out.insert((*ty).to_string(), IdentifyOutcome::Unsupported);
2700                continue;
2701            };
2702            let outcome = self
2703                .invoke_identify_instance(&binding, payload.to_vec())
2704                .await?;
2705            out.insert((*ty).to_string(), outcome);
2706        }
2707        Ok(out)
2708    }
2709
2710    /// Per-provider scoped variant of [`identify_endpoints_by_provider_type`].
2711    ///
2712    /// The wrapper payload is built per-binding from `(headers, body)` and
2713    /// the component's cached identify-instance hint (see
2714    /// [`resolve_identify_hint`]): hinted providers see only the headers
2715    /// their hint declares; unhinted providers see every header passed in.
2716    /// Result-map semantics match the unscoped variant.
2717    ///
2718    /// Loop inlined for the same reason as
2719    /// [`describe_identify_hints_by_provider_type`].
2720    ///
2721    /// [`identify_endpoints_by_provider_type`]:
2722    ///     PackRuntime::identify_endpoints_by_provider_type
2723    /// [`resolve_identify_hint`]: PackRuntime::resolve_identify_hint
2724    /// [`describe_identify_hints_by_provider_type`]:
2725    ///     PackRuntime::describe_identify_hints_by_provider_type
2726    pub async fn identify_endpoints_by_provider_type_scoped(
2727        &self,
2728        provider_types: &[&str],
2729        headers: &[(String, String)],
2730        body: &Value,
2731    ) -> Result<HashMap<String, IdentifyOutcome>> {
2732        let mut out = HashMap::with_capacity(provider_types.len());
2733        let registry = match self.provider_registry_optional()? {
2734            Some(registry) => registry,
2735            None => {
2736                for ty in provider_types {
2737                    out.insert((*ty).to_string(), IdentifyOutcome::Unsupported);
2738                }
2739                return Ok(out);
2740            }
2741        };
2742        for ty in provider_types {
2743            let Some(binding) = registry.try_resolve(None, Some(ty))? else {
2744                out.insert((*ty).to_string(), IdentifyOutcome::Unsupported);
2745                continue;
2746            };
2747            let hint = self.resolve_identify_hint(&binding).await;
2748            let payload = build_scoped_identify_payload(headers, body, hint.as_ref());
2749            let outcome = self.invoke_identify_instance(&binding, payload).await?;
2750            out.insert((*ty).to_string(), outcome);
2751        }
2752        Ok(out)
2753    }
2754
2755    pub(crate) fn provider_registry(&self) -> Result<ProviderRegistry> {
2756        if let Some(registry) = self.provider_registry.read().clone() {
2757            return Ok(registry);
2758        }
2759        let manifest = self
2760            .manifest
2761            .as_ref()
2762            .context("pack manifest required for provider resolution")?;
2763        let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
2764        let registry = ProviderRegistry::new(
2765            manifest,
2766            self.state_store.clone(),
2767            &self.config.tenant,
2768            &env,
2769        )?;
2770        *self.provider_registry.write() = Some(registry.clone());
2771        Ok(registry)
2772    }
2773
2774    pub(crate) fn provider_registry_optional(&self) -> Result<Option<ProviderRegistry>> {
2775        if self.manifest.is_none() {
2776            return Ok(None);
2777        }
2778        Ok(Some(self.provider_registry()?))
2779    }
2780
2781    pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
2782        if let Some(cache) = &self.flows {
2783            return cache
2784                .flows
2785                .get(flow_id)
2786                .cloned()
2787                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
2788        }
2789        if let Some(manifest) = &self.manifest {
2790            let entry = manifest
2791                .flows
2792                .iter()
2793                .find(|f| f.id.as_str() == flow_id)
2794                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
2795            return Ok(entry.flow.clone());
2796        }
2797        bail!("flow '{flow_id}' not available (pack exports disabled)")
2798    }
2799
2800    pub fn metadata(&self) -> &PackMetadata {
2801        &self.metadata
2802    }
2803
2804    /// Read an asset file from the pack's assets directory.
2805    ///
2806    /// Accepts paths like `assets/cards/card-a.json` or `cards/card-a.json`
2807    /// (the `assets/` prefix is stripped automatically).
2808    pub fn read_asset(&self, asset_path: &str) -> Result<Vec<u8>> {
2809        let normalized = asset_path
2810            .trim_start_matches("assets/")
2811            .trim_start_matches("/assets/");
2812        // Try assets tempdir first (extracted from archive).
2813        if let Some(tempdir) = &self.assets_tempdir {
2814            let full = tempdir.path().join("assets").join(normalized);
2815            if full.exists() {
2816                return std::fs::read(&full)
2817                    .with_context(|| format!("read asset {}", full.display()));
2818            }
2819        }
2820        // Try materialized directory.
2821        let full = self.path.join("assets").join(normalized);
2822        if full.exists() {
2823            return std::fs::read(&full).with_context(|| format!("read asset {}", full.display()));
2824        }
2825        bail!("asset not found: {}", asset_path)
2826    }
2827
2828    pub fn component_manifest(&self, component_ref: &str) -> Option<&ComponentManifest> {
2829        self.component_manifests.get(component_ref)
2830    }
2831
2832    pub fn describe_component_contract_v0_6(&self, component_ref: &str) -> Result<Option<Value>> {
2833        let pack_component = self
2834            .components
2835            .get(component_ref)
2836            .with_context(|| format!("component '{component_ref}' not found in pack"))?;
2837        let engine = self.engine.clone();
2838        let config = Arc::clone(&self.config);
2839        let http_client = Arc::clone(&self.http_client);
2840        let mocks = self.mocks.clone();
2841        let session_store = self.session_store.clone();
2842        let state_store = self.state_store.clone();
2843        let secrets = Arc::clone(&self.secrets);
2844        let oauth_config = self.oauth_config.clone();
2845        let wasi_policy = Arc::clone(&self.wasi_policy);
2846        let pack_id = self.metadata().pack_id.clone();
2847        let allow_state_store = self.allows_state_store(component_ref);
2848        let component = pack_component.component.clone();
2849        let component_ref_owned = component_ref.to_string();
2850        let runtime_config_non_secret = self.runtime_config_non_secret.clone();
2851        let runtime_refs = self.runtime_refs.clone();
2852
2853        run_on_wasi_thread("component.describe", move || {
2854            let mut linker = Linker::new(&engine);
2855            register_all(&mut linker, allow_state_store)?;
2856            add_component_control_to_linker(&mut linker)?;
2857
2858            let host_state = HostState::new(
2859                pack_id.clone(),
2860                config,
2861                http_client,
2862                mocks,
2863                session_store,
2864                state_store,
2865                secrets,
2866                oauth_config,
2867                None,
2868                Some(component_ref_owned),
2869                false,
2870                runtime_config_non_secret,
2871                runtime_refs,
2872            )?;
2873            let store_state = ComponentState::new(host_state, wasi_policy)?;
2874            let mut store = wasmtime::Store::new(&engine, store_state);
2875            let pre_instance = linker.instantiate_pre(&component)?;
2876            let pre = match component_api::v0_6_descriptor::ComponentV0V6V0Pre::new(pre_instance) {
2877                Ok(pre) => pre,
2878                Err(_) => return Ok(None),
2879            };
2880            let bytes = block_on(async {
2881                let bindings = pre.instantiate_async(&mut store).await?;
2882                let descriptor = bindings.greentic_component_component_descriptor();
2883                descriptor.call_describe(&mut store)
2884            })?;
2885
2886            if bytes.is_empty() {
2887                return Ok(Some(Value::Null));
2888            }
2889            if let Ok(value) = serde_cbor::from_slice::<Value>(&bytes) {
2890                return Ok(Some(value));
2891            }
2892            if let Ok(value) = serde_json::from_slice::<Value>(&bytes) {
2893                return Ok(Some(value));
2894            }
2895            if let Ok(text) = String::from_utf8(bytes) {
2896                if let Ok(value) = serde_json::from_str::<Value>(&text) {
2897                    return Ok(Some(value));
2898                }
2899                return Ok(Some(Value::String(text)));
2900            }
2901            Ok(Some(Value::Null))
2902        })
2903    }
2904
2905    pub fn load_schema_json(&self, schema_ref: &str) -> Result<Option<Value>> {
2906        let rel = normalize_schema_ref(schema_ref)?;
2907        if self.path.is_dir() {
2908            let candidate = self.path.join(&rel);
2909            if candidate.exists() {
2910                let bytes = std::fs::read(&candidate).with_context(|| {
2911                    format!("failed to read schema file {}", candidate.display())
2912                })?;
2913                let value = serde_json::from_slice::<Value>(&bytes)
2914                    .with_context(|| format!("invalid schema JSON in {}", candidate.display()))?;
2915                return Ok(Some(value));
2916            }
2917        }
2918
2919        if let Some(archive_path) = self
2920            .archive_path
2921            .as_ref()
2922            .or_else(|| path_is_gtpack(&self.path).then_some(&self.path))
2923        {
2924            let file = File::open(archive_path)
2925                .with_context(|| format!("failed to open {}", archive_path.display()))?;
2926            let mut archive = ZipArchive::new(file)
2927                .with_context(|| format!("failed to read pack {}", archive_path.display()))?;
2928            match archive.by_name(&rel) {
2929                Ok(mut entry) => {
2930                    let mut bytes = Vec::new();
2931                    entry.read_to_end(&mut bytes)?;
2932                    let value = serde_json::from_slice::<Value>(&bytes).with_context(|| {
2933                        format!("invalid schema JSON in {}:{}", archive_path.display(), rel)
2934                    })?;
2935                    Ok(Some(value))
2936                }
2937                Err(zip::result::ZipError::FileNotFound) => Ok(None),
2938                Err(err) => Err(anyhow!(err)).with_context(|| {
2939                    format!(
2940                        "failed to read schema `{}` from {}",
2941                        rel,
2942                        archive_path.display()
2943                    )
2944                }),
2945            }
2946        } else {
2947            Ok(None)
2948        }
2949    }
2950
2951    pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
2952        &self.metadata.secret_requirements
2953    }
2954
2955    pub fn missing_secrets(
2956        &self,
2957        tenant_ctx: &TypesTenantCtx,
2958    ) -> Vec<greentic_types::SecretRequirement> {
2959        let env = tenant_ctx.env.as_str().to_string();
2960        let tenant = tenant_ctx.tenant.as_str().to_string();
2961        let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
2962        self.required_secrets()
2963            .iter()
2964            .filter(|req| {
2965                // scope must match current context if provided
2966                if let Some(scope) = &req.scope {
2967                    if scope.env != env {
2968                        return false;
2969                    }
2970                    if scope.tenant != tenant {
2971                        return false;
2972                    }
2973                    if let Some(ref team_req) = scope.team
2974                        && team.as_ref() != Some(team_req)
2975                    {
2976                        return false;
2977                    }
2978                }
2979                let ctx = self.config.tenant_ctx();
2980                read_secret_blocking(
2981                    &self.secrets,
2982                    &ctx,
2983                    &self.metadata.pack_id,
2984                    canonicalize_secret_key(req.key.as_str()).as_str(),
2985                )
2986                .is_err()
2987            })
2988            .cloned()
2989            .collect()
2990    }
2991
2992    pub fn for_component_test(
2993        components: Vec<(String, PathBuf)>,
2994        flows: HashMap<String, FlowIR>,
2995        pack_id: &str,
2996        config: Arc<HostConfig>,
2997    ) -> Result<Self> {
2998        let engine = Engine::default();
2999        let engine_profile =
3000            EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
3001        let cache = CacheManager::new(CacheConfig::default(), engine_profile);
3002        let mut component_map = HashMap::new();
3003        for (name, path) in components {
3004            if !path.exists() {
3005                bail!("component artifact missing: {}", path.display());
3006            }
3007            let wasm_bytes = std::fs::read(&path)?;
3008            let component =
3009                Arc::new(Component::from_binary(&engine, &wasm_bytes).map_err(|err| {
3010                    anyhow!("failed to compile component {}: {err}", path.display())
3011                })?);
3012            component_map.insert(
3013                name.clone(),
3014                PackComponent {
3015                    name,
3016                    version: "0.0.0".into(),
3017                    component,
3018                },
3019            );
3020        }
3021
3022        let mut flow_map = HashMap::new();
3023        let mut descriptors = Vec::new();
3024        for (id, ir) in flows {
3025            let flow_type = ir.flow_type.clone();
3026            let flow = flow_ir_to_flow(ir)?;
3027            flow_map.insert(id.clone(), flow);
3028            descriptors.push(FlowDescriptor {
3029                id: id.clone(),
3030                flow_type,
3031                pack_id: pack_id.to_string(),
3032                profile: "test".into(),
3033                version: "0.0.0".into(),
3034                description: None,
3035            });
3036        }
3037        let entry_flows = descriptors.iter().map(|flow| flow.id.clone()).collect();
3038        let metadata = PackMetadata {
3039            pack_id: pack_id.to_string(),
3040            version: "0.0.0".into(),
3041            entry_flows,
3042            secret_requirements: Vec::new(),
3043        };
3044        let flows_cache = PackFlows {
3045            descriptors: descriptors.clone(),
3046            flows: flow_map,
3047            metadata: metadata.clone(),
3048        };
3049
3050        Ok(Self {
3051            path: PathBuf::new(),
3052            archive_path: None,
3053            config,
3054            engine,
3055            metadata,
3056            manifest: None,
3057            legacy_manifest: None,
3058            component_manifests: HashMap::new(),
3059            mocks: None,
3060            flows: Some(flows_cache),
3061            components: component_map,
3062            http_client: Arc::clone(&HTTP_CLIENT),
3063            pre_cache: Mutex::new(HashMap::new()),
3064            session_store: None,
3065            state_store: None,
3066            wasi_policy: Arc::new(RunnerWasiPolicy::new()),
3067            assets_tempdir: None,
3068            provider_registry: RwLock::new(None),
3069            identify_hint_cache: RwLock::new(HashMap::new()),
3070            secrets: crate::secrets::default_manager()?,
3071            oauth_config: None,
3072            cache,
3073            runtime_config_non_secret: None,
3074            runtime_refs: None,
3075        })
3076    }
3077}
3078
3079fn normalize_schema_ref(schema_ref: &str) -> Result<String> {
3080    let candidate = schema_ref.trim();
3081    if candidate.is_empty() {
3082        bail!("schema ref cannot be empty");
3083    }
3084    let path = Path::new(candidate);
3085    if path.is_absolute() {
3086        bail!("schema ref must be relative: {}", schema_ref);
3087    }
3088    let mut normalized = PathBuf::new();
3089    for component in path.components() {
3090        match component {
3091            std::path::Component::Normal(part) => normalized.push(part),
3092            std::path::Component::CurDir => {}
3093            _ => bail!("schema ref must not contain traversal: {}", schema_ref),
3094        }
3095    }
3096    let normalized = normalized
3097        .to_str()
3098        .map(ToString::to_string)
3099        .ok_or_else(|| anyhow!("schema ref must be valid UTF-8"))?;
3100    if normalized.is_empty() {
3101        bail!("schema ref cannot normalize to empty path");
3102    }
3103    Ok(normalized)
3104}
3105
3106fn path_is_gtpack(path: &Path) -> bool {
3107    path.extension()
3108        .and_then(|ext| ext.to_str())
3109        .map(|ext| ext.eq_ignore_ascii_case("gtpack"))
3110        .unwrap_or(false)
3111}
3112
3113fn is_missing_node_export(err: &wasmtime::Error, version: &str) -> bool {
3114    let message = err.to_string();
3115    message.contains("no exported instance named")
3116        && message.contains(&format!("greentic:component/node@{version}"))
3117}
3118
3119struct PackFlows {
3120    descriptors: Vec<FlowDescriptor>,
3121    flows: HashMap<String, Flow>,
3122    metadata: PackMetadata,
3123}
3124
3125const RUNTIME_FLOW_EXTENSION_IDS: [&str; 3] = [
3126    "greentic.pack.runtime_flow",
3127    "greentic.pack.flow_runtime",
3128    "greentic.pack.runtime_flows",
3129];
3130
3131#[derive(Debug, Deserialize)]
3132struct RuntimeFlowBundle {
3133    flows: Vec<RuntimeFlow>,
3134}
3135
3136#[derive(Debug, Deserialize)]
3137struct RuntimeFlow {
3138    id: String,
3139    #[serde(alias = "flow_type")]
3140    kind: FlowKind,
3141    #[serde(default)]
3142    schema_version: Option<String>,
3143    #[serde(default)]
3144    start: Option<String>,
3145    #[serde(default)]
3146    entrypoints: BTreeMap<String, Value>,
3147    nodes: BTreeMap<String, RuntimeNode>,
3148    #[serde(default)]
3149    metadata: Option<FlowMetadata>,
3150}
3151
3152#[derive(Debug, Deserialize)]
3153struct RuntimeNode {
3154    #[serde(alias = "component")]
3155    component_id: String,
3156    #[serde(default, alias = "operation")]
3157    operation_name: Option<String>,
3158    #[serde(default, alias = "payload", alias = "input")]
3159    operation_payload: Value,
3160    #[serde(default)]
3161    config: Value,
3162    #[serde(default)]
3163    routing: Option<Routing>,
3164    #[serde(default)]
3165    telemetry: Option<TelemetryHints>,
3166}
3167
3168fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
3169    if bytes.is_empty() {
3170        return Ok(Value::Null);
3171    }
3172    serde_json::from_slice(&bytes).or_else(|_| {
3173        String::from_utf8(bytes)
3174            .map(Value::String)
3175            .map_err(|err| anyhow!(err))
3176    })
3177}
3178
3179/// `wasmtime::component::bindgen!` returns this error shape when a
3180/// `*Pre::new(...)` call resolves a world whose required export is
3181/// absent on the component. We treat that as "component does not opt
3182/// in" and let the caller fall back to the operator's statically
3183/// declared instance. Mirrors the same pattern in `invoke_provider`
3184/// for the legacy/path schema-core fallback.
3185///
3186/// The match is intentionally narrow: the error must mention BOTH a
3187/// broad wasmtime marker (`"no exported instance named"` or
3188/// `"no exported function named"`) AND the identity-world-specific
3189/// name segment (`"instance-identity-api"`, `"identify-instance"`,
3190/// `"instance-identity-describe-api"`, or `"describe-identify-instance"`).
3191/// A component that exports the identity world with a malformed
3192/// signature or a typo'd function name will NOT be silently treated
3193/// as unsupported — it will surface as a hard error.
3194fn is_missing_export_error(message: &str) -> bool {
3195    let has_broad_marker = message.contains("no exported instance named")
3196        || message.contains("no exported function named");
3197    let has_identity_segment = message.contains("instance-identity-api")
3198        || message.contains("identify-instance")
3199        || message.contains("instance-identity-describe-api")
3200        || message.contains("describe-identify-instance");
3201    has_broad_marker && has_identity_segment
3202}
3203
3204/// Build the M1 IID.4d wrapper payload (`{ headers, body }`) scoped per
3205/// the provider's [`IdentifyInstanceHint`].
3206///
3207/// - `Some(hint)` ⇒ headers are filtered to ONLY those whose lowercase
3208///   name appears in [`hint.header_names()`](IdentifyInstanceHint::header_names).
3209///   A hint with no `Header` sources yields `"headers": []` — the
3210///   component is declaring that it identifies from the body alone.
3211/// - `None` ⇒ headers pass through unfiltered. The caller is responsible
3212///   for prefiltering (greentic-start applies a global allowlist at the
3213///   ingress boundary), so back-compat with not-yet-hinted providers
3214///   matches the pre-PR-B2 behavior exactly: every probed component
3215///   receives every allowlisted header.
3216///
3217/// `body` is forwarded verbatim regardless of hint shape. Body-path
3218/// short-circuit (using the hint's `BodyPath { json_pointer }` to skip
3219/// invoking `identify-instance` entirely) is a deliberately-deferred
3220/// Phase D follow-up — the current pass scopes the header allowlist only.
3221fn build_scoped_identify_payload(
3222    headers: &[(String, String)],
3223    body: &Value,
3224    hint: Option<&IdentifyInstanceHint>,
3225) -> Vec<u8> {
3226    let scoped_headers: Vec<&(String, String)> = match hint {
3227        // Hints carry 1-3 source headers in practice; a linear scan beats
3228        // a HashSet for that size (no hash + no allocation).
3229        Some(hint) => {
3230            let allowed = hint.header_names();
3231            headers
3232                .iter()
3233                .filter(|(name, _)| allowed.contains(&name.as_str()))
3234                .collect()
3235        }
3236        None => headers.iter().collect(),
3237    };
3238    let wrapper = serde_json::json!({
3239        "headers": scoped_headers
3240            .iter()
3241            .map(|(name, value)| serde_json::json!({ "name": name, "value": value }))
3242            .collect::<Vec<_>>(),
3243        "body": body,
3244    });
3245    serde_json::to_vec(&wrapper).expect("wrapper payload always serializes")
3246}
3247
3248#[cfg(test)]
3249mod build_scoped_identify_payload_tests {
3250    use super::*;
3251    use crate::identify_hint::HintSource;
3252    use serde_json::json;
3253
3254    fn hint(sources: Vec<HintSource>) -> IdentifyInstanceHint {
3255        IdentifyInstanceHint { sources }
3256    }
3257
3258    #[test]
3259    fn unhinted_passes_all_input_headers_through() {
3260        // Back-compat: components without describe-identify-instance must
3261        // continue to see every header the caller (greentic-start)
3262        // allowlisted. Pre-PR-B2 behavior verbatim.
3263        let headers = vec![
3264            (
3265                "x-telegram-bot-api-secret-token".into(),
3266                "telegram-tok".into(),
3267            ),
3268            ("x-future-routing-tag".into(), "abc".into()),
3269        ];
3270        let body = json!({ "update_id": 1 });
3271        let bytes = build_scoped_identify_payload(&headers, &body, None);
3272        let parsed: Value = serde_json::from_slice(&bytes).unwrap();
3273        assert_eq!(
3274            parsed["headers"],
3275            json!([
3276                { "name": "x-telegram-bot-api-secret-token", "value": "telegram-tok" },
3277                { "name": "x-future-routing-tag", "value": "abc" }
3278            ])
3279        );
3280        assert_eq!(parsed["body"], body);
3281    }
3282
3283    #[test]
3284    fn header_hint_filters_to_declared_names_only() {
3285        // Telegram-shape hint: declares one header, sees only that one.
3286        // Other allowlisted headers (e.g. a future Slack signature) MUST
3287        // NOT leak into the Telegram probe.
3288        let h = hint(vec![HintSource::Header {
3289            name: "x-telegram-bot-api-secret-token".into(),
3290        }]);
3291        let headers = vec![
3292            (
3293                "x-telegram-bot-api-secret-token".into(),
3294                "telegram-tok".into(),
3295            ),
3296            ("x-slack-signature".into(), "v0=sig".into()),
3297        ];
3298        let body = json!({});
3299        let bytes = build_scoped_identify_payload(&headers, &body, Some(&h));
3300        let parsed: Value = serde_json::from_slice(&bytes).unwrap();
3301        assert_eq!(
3302            parsed["headers"],
3303            json!([
3304                { "name": "x-telegram-bot-api-secret-token", "value": "telegram-tok" }
3305            ])
3306        );
3307    }
3308
3309    #[test]
3310    fn hints_without_header_sources_drop_all_headers() {
3311        // Body-path-only (Teams-shape) and degenerate-empty hints both yield
3312        // an empty `Header` source set; the wrapper MUST carry no headers
3313        // either way. Passing Telegram's secret token through to either is
3314        // the exact blast-radius bug PR-B2 closes.
3315        let headers = vec![(
3316            "x-telegram-bot-api-secret-token".into(),
3317            "should-not-leak".into(),
3318        )];
3319        let body = json!({ "anything": true });
3320        for h in [
3321            hint(vec![HintSource::BodyPath {
3322                json_pointer: "/recipient/id".into(),
3323            }]),
3324            hint(vec![]),
3325        ] {
3326            let bytes = build_scoped_identify_payload(&headers, &body, Some(&h));
3327            let parsed: Value = serde_json::from_slice(&bytes).unwrap();
3328            assert_eq!(parsed["headers"], json!([]), "hint={:?}", h.sources);
3329            assert_eq!(parsed["body"], body);
3330        }
3331    }
3332
3333    #[test]
3334    fn header_filter_preserves_input_order_and_dups() {
3335        // Multi-value headers and ordering matter to debuggability
3336        // (operators reading the wrapper from a probe should see the
3337        // headers in the same order they arrived). Filter is a
3338        // retain-only operation; no sort, no dedup.
3339        let h = hint(vec![HintSource::Header {
3340            name: "x-route".into(),
3341        }]);
3342        let headers = vec![
3343            ("x-route".into(), "a".into()),
3344            ("x-other".into(), "skip".into()),
3345            ("x-route".into(), "b".into()),
3346        ];
3347        let body = json!({});
3348        let bytes = build_scoped_identify_payload(&headers, &body, Some(&h));
3349        let parsed: Value = serde_json::from_slice(&bytes).unwrap();
3350        assert_eq!(
3351            parsed["headers"],
3352            json!([
3353                { "name": "x-route", "value": "a" },
3354                { "name": "x-route", "value": "b" }
3355            ])
3356        );
3357    }
3358}
3359
3360impl PackFlows {
3361    fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
3362        if let Some(flows) = flows_from_runtime_extension(&manifest) {
3363            return flows;
3364        }
3365        let descriptors = manifest
3366            .flows
3367            .iter()
3368            .map(|entry| FlowDescriptor {
3369                id: entry.id.as_str().to_string(),
3370                flow_type: flow_kind_to_str(entry.kind).to_string(),
3371                pack_id: manifest.pack_id.as_str().to_string(),
3372                profile: manifest.pack_id.as_str().to_string(),
3373                version: manifest.version.to_string(),
3374                description: None,
3375            })
3376            .collect();
3377        let mut flows = HashMap::new();
3378        for entry in &manifest.flows {
3379            flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
3380        }
3381        Self {
3382            metadata: PackMetadata::from_manifest(&manifest),
3383            descriptors,
3384            flows,
3385        }
3386    }
3387}
3388
3389fn flows_from_runtime_extension(manifest: &greentic_types::PackManifest) -> Option<PackFlows> {
3390    let extensions = manifest.extensions.as_ref()?;
3391    let extension = extensions.iter().find_map(|(key, ext)| {
3392        if RUNTIME_FLOW_EXTENSION_IDS
3393            .iter()
3394            .any(|candidate| candidate == key)
3395        {
3396            Some(ext)
3397        } else {
3398            None
3399        }
3400    })?;
3401    let runtime_flows = match decode_runtime_flow_extension(extension) {
3402        Some(flows) if !flows.is_empty() => flows,
3403        _ => return None,
3404    };
3405
3406    let descriptors = runtime_flows
3407        .iter()
3408        .map(|flow| FlowDescriptor {
3409            id: flow.id.as_str().to_string(),
3410            flow_type: flow_kind_to_str(flow.kind).to_string(),
3411            pack_id: manifest.pack_id.as_str().to_string(),
3412            profile: manifest.pack_id.as_str().to_string(),
3413            version: manifest.version.to_string(),
3414            description: None,
3415        })
3416        .collect::<Vec<_>>();
3417    let flows = runtime_flows
3418        .into_iter()
3419        .map(|flow| (flow.id.as_str().to_string(), flow))
3420        .collect();
3421
3422    Some(PackFlows {
3423        metadata: PackMetadata::from_manifest(manifest),
3424        descriptors,
3425        flows,
3426    })
3427}
3428
3429fn decode_runtime_flow_extension(extension: &ExtensionRef) -> Option<Vec<Flow>> {
3430    let value = match extension.inline.as_ref()? {
3431        ExtensionInline::Other(value) => value.clone(),
3432        _ => return None,
3433    };
3434
3435    if let Ok(bundle) = serde_json::from_value::<RuntimeFlowBundle>(value.clone()) {
3436        return Some(collect_runtime_flows(bundle.flows));
3437    }
3438
3439    if let Ok(flows) = serde_json::from_value::<Vec<RuntimeFlow>>(value.clone()) {
3440        return Some(collect_runtime_flows(flows));
3441    }
3442
3443    if let Ok(flows) = serde_json::from_value::<Vec<Flow>>(value) {
3444        return Some(flows);
3445    }
3446
3447    warn!(
3448        extension = %extension.kind,
3449        version = %extension.version,
3450        "runtime flow extension present but could not be decoded"
3451    );
3452    None
3453}
3454
3455fn collect_runtime_flows(flows: Vec<RuntimeFlow>) -> Vec<Flow> {
3456    flows
3457        .into_iter()
3458        .filter_map(|flow| match runtime_flow_to_flow(flow) {
3459            Ok(flow) => Some(flow),
3460            Err(err) => {
3461                warn!(error = %err, "failed to decode runtime flow");
3462                None
3463            }
3464        })
3465        .collect()
3466}
3467
3468fn runtime_flow_to_flow(runtime: RuntimeFlow) -> Result<Flow> {
3469    let flow_id = FlowId::from_str(&runtime.id)
3470        .with_context(|| format!("invalid flow id `{}`", runtime.id))?;
3471    let mut entrypoints = runtime.entrypoints;
3472    if entrypoints.is_empty()
3473        && let Some(start) = &runtime.start
3474    {
3475        entrypoints.insert("default".into(), Value::String(start.clone()));
3476    }
3477
3478    let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
3479    for (id, node) in runtime.nodes {
3480        let node_id = NodeId::from_str(&id).with_context(|| format!("invalid node id `{id}`"))?;
3481        let component_id = ComponentId::from_str(&node.component_id)
3482            .with_context(|| format!("invalid component id `{}`", node.component_id))?;
3483        let operation_payload = if node.config.is_null() {
3484            node.operation_payload
3485        } else {
3486            serde_json::json!({
3487                "input": node.operation_payload,
3488                "config": node.config,
3489            })
3490        };
3491        let component = FlowComponentRef {
3492            id: component_id,
3493            pack_alias: None,
3494            operation: node.operation_name,
3495        };
3496        let routing = node.routing.unwrap_or(Routing::End);
3497        let telemetry = node.telemetry.unwrap_or_default();
3498        nodes.insert(
3499            node_id.clone(),
3500            Node {
3501                id: node_id,
3502                component,
3503                input: InputMapping {
3504                    mapping: operation_payload,
3505                },
3506                output: OutputMapping {
3507                    mapping: Value::Null,
3508                },
3509                err_map: None,
3510                routing,
3511                telemetry,
3512            },
3513        );
3514    }
3515
3516    Ok(Flow {
3517        schema_version: runtime.schema_version.unwrap_or_else(|| "1.0".to_string()),
3518        id: flow_id,
3519        kind: runtime.kind,
3520        entrypoints,
3521        nodes,
3522        metadata: runtime.metadata.unwrap_or_default(),
3523    })
3524}
3525
3526fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
3527    match kind {
3528        greentic_types::FlowKind::Messaging => "messaging",
3529        greentic_types::FlowKind::Event => "event",
3530        greentic_types::FlowKind::ComponentConfig => "component-config",
3531        greentic_types::FlowKind::Job => "job",
3532        greentic_types::FlowKind::Http => "http",
3533    }
3534}
3535
3536fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
3537    let mut file = archive
3538        .by_name(name)
3539        .with_context(|| format!("entry {name} missing from archive"))?;
3540    let mut buf = Vec::new();
3541    file.read_to_end(&mut buf)?;
3542    Ok(buf)
3543}
3544
3545fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
3546    for node in doc.nodes.values_mut() {
3547        let Some((component_ref, payload)) = node
3548            .raw
3549            .iter()
3550            .next()
3551            .map(|(key, value)| (key.clone(), value.clone()))
3552        else {
3553            continue;
3554        };
3555        if component_ref.starts_with("emit.") {
3556            node.operation = Some(component_ref);
3557            node.payload = payload;
3558            node.raw.clear();
3559            continue;
3560        }
3561        let (target_component, operation, input, config) =
3562            infer_component_exec(&payload, &component_ref);
3563        let mut payload_obj = serde_json::Map::new();
3564        // component.exec is meta; ensure the payload carries the actual target component.
3565        payload_obj.insert("component".into(), Value::String(target_component));
3566        payload_obj.insert("operation".into(), Value::String(operation));
3567        payload_obj.insert("input".into(), input);
3568        if let Some(cfg) = config {
3569            payload_obj.insert("config".into(), cfg);
3570        }
3571        node.operation = Some("component.exec".to_string());
3572        node.payload = Value::Object(payload_obj);
3573        node.raw.clear();
3574    }
3575    doc
3576}
3577
3578fn infer_component_exec(
3579    payload: &Value,
3580    component_ref: &str,
3581) -> (String, String, Value, Option<Value>) {
3582    let default_op = if component_ref.starts_with("templating.") {
3583        "render"
3584    } else {
3585        "invoke"
3586    }
3587    .to_string();
3588
3589    if let Value::Object(map) = payload {
3590        let has_embedded_component =
3591            map.get("component").is_some() || map.get("component_ref").is_some();
3592        let op = map
3593            .get("op")
3594            .or_else(|| map.get("operation"))
3595            .and_then(Value::as_str)
3596            .map(|s| s.to_string())
3597            .unwrap_or_else(|| {
3598                if has_embedded_component {
3599                    component_ref.to_string()
3600                } else {
3601                    default_op.clone()
3602                }
3603            });
3604
3605        let mut input = map.clone();
3606        let config = input.remove("config");
3607        let canonical_input = if has_embedded_component {
3608            input.get("input").cloned()
3609        } else {
3610            None
3611        };
3612        let component = input
3613            .get("component")
3614            .or_else(|| input.get("component_ref"))
3615            .and_then(Value::as_str)
3616            .map(|s| s.to_string())
3617            .unwrap_or_else(|| component_ref.to_string());
3618        input.remove("component");
3619        input.remove("component_ref");
3620        input.remove("op");
3621        input.remove("operation");
3622        let input = canonical_input.unwrap_or(Value::Object(input));
3623        return (component, op, input, config);
3624    }
3625
3626    (component_ref.to_string(), default_op, payload.clone(), None)
3627}
3628
3629#[derive(Clone, Debug)]
3630struct ComponentSpec {
3631    id: String,
3632    version: String,
3633    legacy_path: Option<String>,
3634}
3635
3636#[derive(Clone, Debug)]
3637struct ComponentSourceInfo {
3638    digest: Option<String>,
3639    source: ComponentSourceRef,
3640    artifact: ComponentArtifactLocation,
3641    expected_wasm_sha256: Option<String>,
3642    skip_digest_verification: bool,
3643}
3644
3645#[derive(Clone, Debug)]
3646enum ComponentArtifactLocation {
3647    Inline { wasm_path: String },
3648    Remote,
3649}
3650
3651#[derive(Clone, Debug, Deserialize)]
3652struct PackLockV1 {
3653    schema_version: u32,
3654    components: Vec<PackLockComponent>,
3655}
3656
3657#[derive(Clone, Debug, Deserialize)]
3658struct PackLockComponent {
3659    name: String,
3660    #[serde(default, rename = "source_ref")]
3661    source_ref: Option<String>,
3662    #[serde(default, rename = "ref")]
3663    legacy_ref: Option<String>,
3664    #[serde(default)]
3665    component_id: Option<ComponentId>,
3666    #[serde(default)]
3667    bundled: Option<bool>,
3668    #[serde(default, rename = "bundled_path")]
3669    bundled_path: Option<String>,
3670    #[serde(default, rename = "path")]
3671    legacy_path: Option<String>,
3672    #[serde(default)]
3673    wasm_sha256: Option<String>,
3674    #[serde(default, rename = "sha256")]
3675    legacy_sha256: Option<String>,
3676    #[serde(default)]
3677    resolved_digest: Option<String>,
3678    #[serde(default)]
3679    digest: Option<String>,
3680}
3681
3682fn component_specs(
3683    manifest: Option<&greentic_types::PackManifest>,
3684    legacy_manifest: Option<&legacy_pack::PackManifest>,
3685    component_sources: Option<&ComponentSourcesV1>,
3686    pack_lock: Option<&PackLockV1>,
3687) -> Vec<ComponentSpec> {
3688    if let Some(manifest) = manifest {
3689        if !manifest.components.is_empty() {
3690            return manifest
3691                .components
3692                .iter()
3693                .map(|entry| ComponentSpec {
3694                    id: entry.id.as_str().to_string(),
3695                    version: entry.version.to_string(),
3696                    legacy_path: None,
3697                })
3698                .collect();
3699        }
3700        if let Some(lock) = pack_lock {
3701            let mut seen = HashSet::new();
3702            let mut specs = Vec::new();
3703            for entry in &lock.components {
3704                let id = entry
3705                    .component_id
3706                    .as_ref()
3707                    .map(|id| id.as_str())
3708                    .unwrap_or(entry.name.as_str());
3709                if seen.insert(id.to_string()) {
3710                    specs.push(ComponentSpec {
3711                        id: id.to_string(),
3712                        version: "0.0.0".to_string(),
3713                        legacy_path: None,
3714                    });
3715                }
3716            }
3717            return specs;
3718        }
3719        if let Some(sources) = component_sources {
3720            let mut seen = HashSet::new();
3721            let mut specs = Vec::new();
3722            for entry in &sources.components {
3723                let id = entry
3724                    .component_id
3725                    .as_ref()
3726                    .map(|id| id.as_str())
3727                    .unwrap_or(entry.name.as_str());
3728                if seen.insert(id.to_string()) {
3729                    specs.push(ComponentSpec {
3730                        id: id.to_string(),
3731                        version: "0.0.0".to_string(),
3732                        legacy_path: None,
3733                    });
3734                }
3735            }
3736            return specs;
3737        }
3738    }
3739    if let Some(legacy_manifest) = legacy_manifest {
3740        return legacy_manifest
3741            .components
3742            .iter()
3743            .map(|entry| ComponentSpec {
3744                id: entry.name.clone(),
3745                version: entry.version.to_string(),
3746                legacy_path: Some(entry.file_wasm.clone()),
3747            })
3748            .collect();
3749    }
3750    Vec::new()
3751}
3752
3753fn component_sources_table(
3754    sources: Option<&ComponentSourcesV1>,
3755) -> Result<Option<HashMap<String, ComponentSourceInfo>>> {
3756    let Some(sources) = sources else {
3757        return Ok(None);
3758    };
3759    let mut table = HashMap::new();
3760    for entry in &sources.components {
3761        let artifact = match &entry.artifact {
3762            ArtifactLocationV1::Inline { wasm_path, .. } => ComponentArtifactLocation::Inline {
3763                wasm_path: wasm_path.clone(),
3764            },
3765            ArtifactLocationV1::Remote => ComponentArtifactLocation::Remote,
3766        };
3767        let info = ComponentSourceInfo {
3768            digest: Some(entry.resolved.digest.clone()),
3769            source: entry.source.clone(),
3770            artifact,
3771            expected_wasm_sha256: None,
3772            skip_digest_verification: false,
3773        };
3774        if let Some(component_id) = entry.component_id.as_ref() {
3775            table.insert(component_id.as_str().to_string(), info.clone());
3776        }
3777        table.insert(entry.name.clone(), info);
3778    }
3779    Ok(Some(table))
3780}
3781
3782fn load_pack_lock(path: &Path) -> Result<Option<PackLockV1>> {
3783    let lock_path = if path.is_dir() {
3784        let candidate = path.join("pack.lock");
3785        if candidate.exists() {
3786            Some(candidate)
3787        } else {
3788            let candidate = path.join("pack.lock.json");
3789            candidate.exists().then_some(candidate)
3790        }
3791    } else {
3792        None
3793    };
3794    let Some(lock_path) = lock_path else {
3795        return Ok(None);
3796    };
3797    let raw = std::fs::read_to_string(&lock_path)
3798        .with_context(|| format!("failed to read {}", lock_path.display()))?;
3799    let lock: PackLockV1 = serde_json::from_str(&raw).context("failed to parse pack.lock")?;
3800    if lock.schema_version != 1 {
3801        bail!("pack.lock schema_version must be 1");
3802    }
3803    Ok(Some(lock))
3804}
3805
3806fn find_pack_lock_roots(
3807    pack_path: &Path,
3808    is_dir: bool,
3809    archive_hint: Option<&Path>,
3810) -> Vec<PathBuf> {
3811    if is_dir {
3812        return vec![pack_path.to_path_buf()];
3813    }
3814    let mut roots = Vec::new();
3815    if let Some(archive_path) = archive_hint {
3816        if let Some(parent) = archive_path.parent() {
3817            roots.push(parent.to_path_buf());
3818            if let Some(grandparent) = parent.parent() {
3819                roots.push(grandparent.to_path_buf());
3820            }
3821        }
3822    } else if let Some(parent) = pack_path.parent() {
3823        roots.push(parent.to_path_buf());
3824        if let Some(grandparent) = parent.parent() {
3825            roots.push(grandparent.to_path_buf());
3826        }
3827    }
3828    roots
3829}
3830
3831fn normalize_sha256(digest: &str) -> Result<String> {
3832    let trimmed = digest.trim();
3833    if trimmed.is_empty() {
3834        bail!("sha256 digest cannot be empty");
3835    }
3836    if let Some(stripped) = trimmed.strip_prefix("sha256:") {
3837        if stripped.is_empty() {
3838            bail!("sha256 digest must include hex bytes after sha256:");
3839        }
3840        return Ok(trimmed.to_string());
3841    }
3842    if trimmed.chars().all(|c| c.is_ascii_hexdigit()) {
3843        return Ok(format!("sha256:{trimmed}"));
3844    }
3845    bail!("sha256 digest must be hex or sha256:<hex>");
3846}
3847
3848fn component_sources_table_from_pack_lock(
3849    lock: &PackLockV1,
3850    allow_missing_hash: bool,
3851) -> Result<HashMap<String, ComponentSourceInfo>> {
3852    let mut table = HashMap::new();
3853    let mut names = HashSet::new();
3854    for entry in &lock.components {
3855        if !names.insert(entry.name.clone()) {
3856            bail!(
3857                "pack.lock contains duplicate component name `{}`",
3858                entry.name
3859            );
3860        }
3861        let source_ref = match (&entry.source_ref, &entry.legacy_ref) {
3862            (Some(primary), Some(legacy)) => {
3863                if primary != legacy {
3864                    bail!(
3865                        "pack.lock component {} has conflicting refs: {} vs {}",
3866                        entry.name,
3867                        primary,
3868                        legacy
3869                    );
3870                }
3871                primary.as_str()
3872            }
3873            (Some(primary), None) => primary.as_str(),
3874            (None, Some(legacy)) => legacy.as_str(),
3875            (None, None) => {
3876                bail!("pack.lock component {} missing source_ref", entry.name);
3877            }
3878        };
3879        let source: ComponentSourceRef = source_ref
3880            .parse()
3881            .with_context(|| format!("invalid component ref `{}`", source_ref))?;
3882        let bundled_path = match (&entry.bundled_path, &entry.legacy_path) {
3883            (Some(primary), Some(legacy)) => {
3884                if primary != legacy {
3885                    bail!(
3886                        "pack.lock component {} has conflicting bundled paths: {} vs {}",
3887                        entry.name,
3888                        primary,
3889                        legacy
3890                    );
3891                }
3892                Some(primary.clone())
3893            }
3894            (Some(primary), None) => Some(primary.clone()),
3895            (None, Some(legacy)) => Some(legacy.clone()),
3896            (None, None) => None,
3897        };
3898        let bundled = entry.bundled.unwrap_or(false) || bundled_path.is_some();
3899        let (artifact, digest, expected_wasm_sha256, skip_digest_verification) = if bundled {
3900            let wasm_path = bundled_path.ok_or_else(|| {
3901                anyhow!(
3902                    "pack.lock component {} marked bundled but bundled_path is missing",
3903                    entry.name
3904                )
3905            })?;
3906            let expected_raw = match (&entry.wasm_sha256, &entry.legacy_sha256) {
3907                (Some(primary), Some(legacy)) => {
3908                    if primary != legacy {
3909                        bail!(
3910                            "pack.lock component {} has conflicting wasm_sha256 values: {} vs {}",
3911                            entry.name,
3912                            primary,
3913                            legacy
3914                        );
3915                    }
3916                    Some(primary.as_str())
3917                }
3918                (Some(primary), None) => Some(primary.as_str()),
3919                (None, Some(legacy)) => Some(legacy.as_str()),
3920                (None, None) => None,
3921            };
3922            let expected = match expected_raw {
3923                Some(value) => Some(normalize_sha256(value)?),
3924                None => None,
3925            };
3926            if expected.is_none() && !allow_missing_hash {
3927                bail!(
3928                    "pack.lock component {} missing wasm_sha256 for bundled component",
3929                    entry.name
3930                );
3931            }
3932            (
3933                ComponentArtifactLocation::Inline { wasm_path },
3934                expected.clone(),
3935                expected,
3936                allow_missing_hash && expected_raw.is_none(),
3937            )
3938        } else {
3939            if source.is_tag() {
3940                bail!(
3941                    "component {} uses tag ref {} but is not bundled; rebuild the pack",
3942                    entry.name,
3943                    source
3944                );
3945            }
3946            let expected = entry
3947                .resolved_digest
3948                .as_deref()
3949                .or(entry.digest.as_deref())
3950                .ok_or_else(|| {
3951                    anyhow!(
3952                        "pack.lock component {} missing resolved_digest for remote component",
3953                        entry.name
3954                    )
3955                })?;
3956            (
3957                ComponentArtifactLocation::Remote,
3958                Some(normalize_digest(expected)),
3959                None,
3960                false,
3961            )
3962        };
3963        let info = ComponentSourceInfo {
3964            digest,
3965            source,
3966            artifact,
3967            expected_wasm_sha256,
3968            skip_digest_verification,
3969        };
3970        if let Some(component_id) = entry.component_id.as_ref() {
3971            let key = component_id.as_str().to_string();
3972            if table.contains_key(&key) {
3973                bail!(
3974                    "pack.lock contains duplicate component id `{}`",
3975                    component_id.as_str()
3976                );
3977            }
3978            table.insert(key, info.clone());
3979        }
3980        if entry.name
3981            != entry
3982                .component_id
3983                .as_ref()
3984                .map(|id| id.as_str())
3985                .unwrap_or("")
3986        {
3987            table.insert(entry.name.clone(), info);
3988        }
3989    }
3990    Ok(table)
3991}
3992
3993fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
3994    if let Some(path) = &spec.legacy_path {
3995        return root.join(path);
3996    }
3997    root.join("components").join(format!("{}.wasm", spec.id))
3998}
3999
4000fn normalize_digest(digest: &str) -> String {
4001    if digest.starts_with("sha256:") || digest.starts_with("blake3:") {
4002        digest.to_string()
4003    } else {
4004        format!("sha256:{digest}")
4005    }
4006}
4007
4008fn compute_digest_for(bytes: &[u8], digest: &str) -> Result<String> {
4009    if digest.starts_with("blake3:") {
4010        let hash = blake3::hash(bytes);
4011        return Ok(format!("blake3:{}", hash.to_hex()));
4012    }
4013    let mut hasher = sha2::Sha256::new();
4014    hasher.update(bytes);
4015    Ok(format!("sha256:{}", to_hex(&hasher.finalize())))
4016}
4017
4018fn compute_sha256_digest_for(bytes: &[u8]) -> String {
4019    let mut hasher = sha2::Sha256::new();
4020    hasher.update(bytes);
4021    format!("sha256:{}", to_hex(&hasher.finalize()))
4022}
4023
4024fn build_artifact_key(cache: &CacheManager, digest: Option<&str>, bytes: &[u8]) -> ArtifactKey {
4025    let wasm_digest = digest
4026        .map(normalize_digest)
4027        .unwrap_or_else(|| compute_sha256_digest_for(bytes));
4028    ArtifactKey::new(cache.engine_profile_id().to_string(), wasm_digest)
4029}
4030
4031async fn compile_component_with_cache(
4032    cache: &CacheManager,
4033    engine: &Engine,
4034    digest: Option<&str>,
4035    bytes: Vec<u8>,
4036) -> Result<Arc<Component>> {
4037    let key = build_artifact_key(cache, digest, &bytes);
4038    cache.get_component(engine, &key, || Ok(bytes)).await
4039}
4040
4041fn verify_component_digest(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
4042    let normalized_expected = normalize_digest(expected);
4043    let actual = compute_digest_for(bytes, &normalized_expected)?;
4044    if normalize_digest(&actual) != normalized_expected {
4045        bail!(
4046            "component {component_id} digest mismatch: expected {normalized_expected}, got {actual}"
4047        );
4048    }
4049    Ok(())
4050}
4051
4052fn verify_wasm_sha256(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
4053    let normalized_expected = normalize_sha256(expected)?;
4054    let actual = compute_sha256_digest_for(bytes);
4055    if actual != normalized_expected {
4056        bail!(
4057            "component {component_id} bundled digest mismatch: expected {normalized_expected}, got {actual}"
4058        );
4059    }
4060    Ok(())
4061}
4062
4063fn to_hex(digest: &[u8]) -> String {
4064    digest.iter().map(|byte| format!("{byte:02x}")).collect()
4065}
4066
4067#[cfg(test)]
4068mod pack_lock_tests {
4069    use super::*;
4070    use tempfile::TempDir;
4071
4072    #[test]
4073    fn pack_lock_tag_ref_requires_bundle() {
4074        let lock = PackLockV1 {
4075            schema_version: 1,
4076            components: vec![PackLockComponent {
4077                name: "templates".to_string(),
4078                source_ref: Some("oci://registry.test/templates:latest".to_string()),
4079                legacy_ref: None,
4080                component_id: None,
4081                bundled: Some(false),
4082                bundled_path: None,
4083                legacy_path: None,
4084                wasm_sha256: None,
4085                legacy_sha256: None,
4086                resolved_digest: None,
4087                digest: None,
4088            }],
4089        };
4090        let err = component_sources_table_from_pack_lock(&lock, false).unwrap_err();
4091        assert!(
4092            err.to_string().contains("tag ref") && err.to_string().contains("rebuild the pack"),
4093            "unexpected error: {err}"
4094        );
4095    }
4096
4097    #[test]
4098    fn bundled_hash_mismatch_errors() {
4099        let rt = tokio::runtime::Runtime::new().expect("runtime");
4100        let temp = TempDir::new().expect("temp dir");
4101        let engine = Engine::default();
4102        let engine_profile =
4103            EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
4104        let cache_config = CacheConfig {
4105            root: temp.path().join("cache"),
4106            ..CacheConfig::default()
4107        };
4108        let cache = CacheManager::new(cache_config, engine_profile);
4109        let wasm_path = temp.path().join("component.wasm");
4110        let fixture_wasm = Path::new(env!("CARGO_MANIFEST_DIR"))
4111            .join("../../tests/fixtures/packs/secrets_store_smoke/components/echo_secret.wasm");
4112        let bytes = std::fs::read(&fixture_wasm).expect("read fixture wasm");
4113        std::fs::write(&wasm_path, &bytes).expect("write temp wasm");
4114
4115        let spec = ComponentSpec {
4116            id: "qa.process".to_string(),
4117            version: "0.0.0".to_string(),
4118            legacy_path: None,
4119        };
4120        let mut missing = HashSet::new();
4121        missing.insert(spec.id.clone());
4122
4123        let mut sources = HashMap::new();
4124        sources.insert(
4125            spec.id.clone(),
4126            ComponentSourceInfo {
4127                digest: Some("sha256:deadbeef".to_string()),
4128                source: ComponentSourceRef::Oci("registry.test/qa.process@sha256:deadbeef".into()),
4129                artifact: ComponentArtifactLocation::Inline {
4130                    wasm_path: "component.wasm".to_string(),
4131                },
4132                expected_wasm_sha256: Some("sha256:deadbeef".to_string()),
4133                skip_digest_verification: false,
4134            },
4135        );
4136
4137        let mut loaded = HashMap::new();
4138        let result = rt.block_on(load_components_from_sources(
4139            &cache,
4140            &engine,
4141            &sources,
4142            &ComponentResolution::default(),
4143            &[spec],
4144            &mut missing,
4145            &mut loaded,
4146            Some(temp.path()),
4147            None,
4148        ));
4149        let err = result.unwrap_err();
4150        assert!(
4151            err.to_string().contains("bundled digest mismatch"),
4152            "unexpected error: {err}"
4153        );
4154    }
4155}
4156
4157#[cfg(test)]
4158mod pack_resolution_prop_tests {
4159    use super::*;
4160    use greentic_types::{ArtifactLocationV1, ComponentSourceEntryV1, ResolvedComponentV1};
4161    use proptest::prelude::*;
4162    use proptest::test_runner::{Config as ProptestConfig, RngAlgorithm, TestRng, TestRunner};
4163    use std::collections::BTreeSet;
4164    use std::path::Path;
4165    use std::str::FromStr;
4166
4167    #[derive(Clone, Debug)]
4168    enum ResolveRequest {
4169        ById(String),
4170        ByName(String),
4171    }
4172
4173    #[derive(Clone, Debug, PartialEq, Eq)]
4174    struct ResolvedComponent {
4175        key: String,
4176        source: String,
4177        artifact: String,
4178        digest: Option<String>,
4179        expected_wasm_sha256: Option<String>,
4180        skip_digest_verification: bool,
4181    }
4182
4183    #[derive(Clone, Debug, PartialEq, Eq)]
4184    struct ResolveError {
4185        code: String,
4186        message: String,
4187        context_key: String,
4188    }
4189
4190    #[derive(Clone, Debug)]
4191    struct Scenario {
4192        pack_lock: Option<PackLockV1>,
4193        component_sources: Option<ComponentSourcesV1>,
4194        request: ResolveRequest,
4195        expected_sha256: Option<String>,
4196        bytes: Vec<u8>,
4197    }
4198
4199    fn resolve_component_test(
4200        sources: Option<&ComponentSourcesV1>,
4201        lock: Option<&PackLockV1>,
4202        request: &ResolveRequest,
4203    ) -> Result<ResolvedComponent, ResolveError> {
4204        let table = if let Some(lock) = lock {
4205            component_sources_table_from_pack_lock(lock, false).map_err(|err| ResolveError {
4206                code: classify_pack_lock_error(err.to_string().as_str()).to_string(),
4207                message: err.to_string(),
4208                context_key: request_key(request).to_string(),
4209            })?
4210        } else {
4211            let sources = component_sources_table(sources).map_err(|err| ResolveError {
4212                code: "component_sources_error".to_string(),
4213                message: err.to_string(),
4214                context_key: request_key(request).to_string(),
4215            })?;
4216            sources.ok_or_else(|| ResolveError {
4217                code: "missing_component_sources".to_string(),
4218                message: "component sources not provided".to_string(),
4219                context_key: request_key(request).to_string(),
4220            })?
4221        };
4222
4223        let key = request_key(request);
4224        let source = table.get(key).ok_or_else(|| ResolveError {
4225            code: "component_not_found".to_string(),
4226            message: format!("component {key} not found"),
4227            context_key: key.to_string(),
4228        })?;
4229
4230        Ok(ResolvedComponent {
4231            key: key.to_string(),
4232            source: source.source.to_string(),
4233            artifact: match source.artifact {
4234                ComponentArtifactLocation::Inline { .. } => "inline".to_string(),
4235                ComponentArtifactLocation::Remote => "remote".to_string(),
4236            },
4237            digest: source.digest.clone(),
4238            expected_wasm_sha256: source.expected_wasm_sha256.clone(),
4239            skip_digest_verification: source.skip_digest_verification,
4240        })
4241    }
4242
4243    fn request_key(request: &ResolveRequest) -> &str {
4244        match request {
4245            ResolveRequest::ById(value) => value.as_str(),
4246            ResolveRequest::ByName(value) => value.as_str(),
4247        }
4248    }
4249
4250    fn classify_pack_lock_error(message: &str) -> &'static str {
4251        if message.contains("duplicate component name") {
4252            "duplicate_name"
4253        } else if message.contains("duplicate component id") {
4254            "duplicate_id"
4255        } else if message.contains("conflicting refs") {
4256            "conflicting_ref"
4257        } else if message.contains("conflicting bundled paths") {
4258            "conflicting_bundled_path"
4259        } else if message.contains("conflicting wasm_sha256") {
4260            "conflicting_wasm_sha256"
4261        } else if message.contains("missing source_ref") {
4262            "missing_source_ref"
4263        } else if message.contains("marked bundled but bundled_path is missing") {
4264            "missing_bundled_path"
4265        } else if message.contains("missing wasm_sha256") {
4266            "missing_wasm_sha256"
4267        } else if message.contains("tag ref") && message.contains("not bundled") {
4268            "tag_ref_requires_bundle"
4269        } else if message.contains("missing resolved_digest") {
4270            "missing_resolved_digest"
4271        } else if message.contains("invalid component ref") {
4272            "invalid_component_ref"
4273        } else if message.contains("sha256 digest") {
4274            "invalid_sha256"
4275        } else {
4276            "unknown_error"
4277        }
4278    }
4279
4280    fn known_error_codes() -> BTreeSet<&'static str> {
4281        [
4282            "component_sources_error",
4283            "missing_component_sources",
4284            "component_not_found",
4285            "duplicate_name",
4286            "duplicate_id",
4287            "conflicting_ref",
4288            "conflicting_bundled_path",
4289            "conflicting_wasm_sha256",
4290            "missing_source_ref",
4291            "missing_bundled_path",
4292            "missing_wasm_sha256",
4293            "tag_ref_requires_bundle",
4294            "missing_resolved_digest",
4295            "invalid_component_ref",
4296            "invalid_sha256",
4297            "unknown_error",
4298        ]
4299        .into_iter()
4300        .collect()
4301    }
4302
4303    fn proptest_config() -> ProptestConfig {
4304        let cases = std::env::var("PROPTEST_CASES")
4305            .ok()
4306            .and_then(|value| value.parse::<u32>().ok())
4307            .unwrap_or(128);
4308        ProptestConfig {
4309            cases,
4310            failure_persistence: None,
4311            ..ProptestConfig::default()
4312        }
4313    }
4314
4315    fn proptest_seed() -> Option<[u8; 32]> {
4316        let seed = std::env::var("PROPTEST_SEED")
4317            .ok()
4318            .and_then(|value| value.parse::<u64>().ok())?;
4319        let mut bytes = [0u8; 32];
4320        bytes[..8].copy_from_slice(&seed.to_le_bytes());
4321        Some(bytes)
4322    }
4323
4324    fn run_cases(strategy: impl Strategy<Value = Scenario>, cases: u32, seed: Option<[u8; 32]>) {
4325        let config = ProptestConfig {
4326            cases,
4327            failure_persistence: None,
4328            ..ProptestConfig::default()
4329        };
4330        let mut runner = match seed {
4331            Some(bytes) => {
4332                TestRunner::new_with_rng(config, TestRng::from_seed(RngAlgorithm::ChaCha, &bytes))
4333            }
4334            None => TestRunner::new(config),
4335        };
4336        runner
4337            .run(&strategy, |scenario| {
4338                run_scenario(&scenario);
4339                Ok(())
4340            })
4341            .unwrap();
4342    }
4343
4344    fn run_scenario(scenario: &Scenario) {
4345        let known_codes = known_error_codes();
4346        let first = resolve_component_test(
4347            scenario.component_sources.as_ref(),
4348            scenario.pack_lock.as_ref(),
4349            &scenario.request,
4350        );
4351        let second = resolve_component_test(
4352            scenario.component_sources.as_ref(),
4353            scenario.pack_lock.as_ref(),
4354            &scenario.request,
4355        );
4356        assert_eq!(normalize_result(&first), normalize_result(&second));
4357
4358        if let Some(lock) = scenario.pack_lock.as_ref() {
4359            let lock_only = resolve_component_test(None, Some(lock), &scenario.request);
4360            assert_eq!(normalize_result(&first), normalize_result(&lock_only));
4361        }
4362
4363        if let Err(err) = first.as_ref() {
4364            assert!(
4365                known_codes.contains(err.code.as_str()),
4366                "unexpected error code {}: {}",
4367                err.code,
4368                err.message
4369            );
4370        }
4371
4372        if let Some(expected) = scenario.expected_sha256.as_deref() {
4373            let expected_ok =
4374                verify_wasm_sha256("test.component", expected, &scenario.bytes).is_ok();
4375            let actual = compute_sha256_digest_for(&scenario.bytes);
4376            if actual == normalize_sha256(expected).unwrap_or_default() {
4377                assert!(expected_ok, "expected sha256 match to succeed");
4378            } else {
4379                assert!(!expected_ok, "expected sha256 mismatch to fail");
4380            }
4381        }
4382    }
4383
4384    fn normalize_result(
4385        result: &Result<ResolvedComponent, ResolveError>,
4386    ) -> Result<ResolvedComponent, ResolveError> {
4387        match result {
4388            Ok(value) => Ok(value.clone()),
4389            Err(err) => Err(err.clone()),
4390        }
4391    }
4392
4393    fn scenario_strategy() -> impl Strategy<Value = Scenario> {
4394        let name = any::<u8>().prop_map(|n| format!("component{n}.core"));
4395        let alt_name = any::<u8>().prop_map(|n| format!("component_alt{n}.core"));
4396        let tag_ref = any::<bool>();
4397        let bundled = any::<bool>();
4398        let include_sha = any::<bool>();
4399        let include_component_id = any::<bool>();
4400        let request_by_id = any::<bool>();
4401        let use_lock = any::<bool>();
4402        let use_sources = any::<bool>();
4403        let bytes = prop::collection::vec(any::<u8>(), 1..64);
4404
4405        (
4406            name,
4407            alt_name,
4408            tag_ref,
4409            bundled,
4410            include_sha,
4411            include_component_id,
4412            request_by_id,
4413            use_lock,
4414            use_sources,
4415            bytes,
4416        )
4417            .prop_map(
4418                |(
4419                    name,
4420                    alt_name,
4421                    tag_ref,
4422                    bundled,
4423                    include_sha,
4424                    include_component_id,
4425                    request_by_id,
4426                    use_lock,
4427                    use_sources,
4428                    bytes,
4429                )| {
4430                    let component_id_str = if include_component_id {
4431                        alt_name.clone()
4432                    } else {
4433                        name.clone()
4434                    };
4435                    let component_id = ComponentId::from_str(&component_id_str).ok();
4436                    let source_ref = if tag_ref {
4437                        format!("oci://registry.test/{name}:v1")
4438                    } else {
4439                        format!(
4440                            "oci://registry.test/{name}@sha256:{}",
4441                            hex::encode([0x11u8; 32])
4442                        )
4443                    };
4444                    let expected_sha256 = if bundled && include_sha {
4445                        Some(compute_sha256_digest_for(&bytes))
4446                    } else {
4447                        None
4448                    };
4449
4450                    let lock_component = PackLockComponent {
4451                        name: name.clone(),
4452                        source_ref: Some(source_ref),
4453                        legacy_ref: None,
4454                        component_id,
4455                        bundled: Some(bundled),
4456                        bundled_path: if bundled {
4457                            Some(format!("components/{name}.wasm"))
4458                        } else {
4459                            None
4460                        },
4461                        legacy_path: None,
4462                        wasm_sha256: expected_sha256.clone(),
4463                        legacy_sha256: None,
4464                        resolved_digest: if bundled {
4465                            None
4466                        } else {
4467                            Some("sha256:deadbeef".to_string())
4468                        },
4469                        digest: None,
4470                    };
4471
4472                    let pack_lock = if use_lock {
4473                        Some(PackLockV1 {
4474                            schema_version: 1,
4475                            components: vec![lock_component],
4476                        })
4477                    } else {
4478                        None
4479                    };
4480
4481                    let component_sources = if use_sources {
4482                        Some(ComponentSourcesV1::new(vec![ComponentSourceEntryV1 {
4483                            name: name.clone(),
4484                            component_id: ComponentId::from_str(&name).ok(),
4485                            source: ComponentSourceRef::from_str(
4486                                "oci://registry.test/component@sha256:deadbeef",
4487                            )
4488                            .expect("component ref"),
4489                            resolved: ResolvedComponentV1 {
4490                                digest: "sha256:deadbeef".to_string(),
4491                                signature: None,
4492                                signed_by: None,
4493                            },
4494                            artifact: if bundled {
4495                                ArtifactLocationV1::Inline {
4496                                    wasm_path: format!("components/{name}.wasm"),
4497                                    manifest_path: None,
4498                                }
4499                            } else {
4500                                ArtifactLocationV1::Remote
4501                            },
4502                            licensing_hint: None,
4503                            metering_hint: None,
4504                        }]))
4505                    } else {
4506                        None
4507                    };
4508
4509                    let request = if request_by_id {
4510                        ResolveRequest::ById(component_id_str.clone())
4511                    } else {
4512                        ResolveRequest::ByName(name.clone())
4513                    };
4514
4515                    Scenario {
4516                        pack_lock,
4517                        component_sources,
4518                        request,
4519                        expected_sha256,
4520                        bytes,
4521                    }
4522                },
4523            )
4524    }
4525
4526    #[test]
4527    fn pack_resolution_proptest() {
4528        let seed = proptest_seed();
4529        run_cases(scenario_strategy(), proptest_config().cases, seed);
4530    }
4531
4532    #[test]
4533    fn pack_resolution_regression_seeds() {
4534        let seeds_path =
4535            Path::new(env!("CARGO_MANIFEST_DIR")).join("../../tests/fixtures/proptest-seeds.txt");
4536        let raw = std::fs::read_to_string(&seeds_path).expect("read proptest seeds");
4537        for line in raw.lines() {
4538            let line = line.trim();
4539            if line.is_empty() || line.starts_with('#') {
4540                continue;
4541            }
4542            let seed = line.parse::<u64>().expect("seed must be an integer");
4543            let mut bytes = [0u8; 32];
4544            bytes[..8].copy_from_slice(&seed.to_le_bytes());
4545            run_cases(scenario_strategy(), 1, Some(bytes));
4546        }
4547    }
4548}
4549
4550fn locate_pack_assets(
4551    materialized_root: Option<&Path>,
4552    archive_hint: Option<&Path>,
4553) -> Result<(Option<PathBuf>, Option<TempDir>)> {
4554    if let Some(root) = materialized_root {
4555        let assets = root.join("assets");
4556        if assets.is_dir() {
4557            return Ok((Some(assets), None));
4558        }
4559    }
4560    if let Some(path) = archive_hint
4561        && let Some((tempdir, assets)) = extract_assets_from_archive(path)?
4562    {
4563        return Ok((Some(assets), Some(tempdir)));
4564    }
4565    Ok((None, None))
4566}
4567
4568fn extract_assets_from_archive(path: &Path) -> Result<Option<(TempDir, PathBuf)>> {
4569    let file =
4570        File::open(path).with_context(|| format!("failed to open pack {}", path.display()))?;
4571    let mut archive =
4572        ZipArchive::new(file).with_context(|| format!("failed to read pack {}", path.display()))?;
4573    let temp = TempDir::new().context("failed to create temporary assets directory")?;
4574    let mut found = false;
4575    for idx in 0..archive.len() {
4576        let mut entry = archive.by_index(idx)?;
4577        let name = entry.name();
4578        if !name.starts_with("assets/") {
4579            continue;
4580        }
4581        let dest = temp.path().join(name);
4582        if name.ends_with('/') {
4583            std::fs::create_dir_all(&dest)?;
4584            found = true;
4585            continue;
4586        }
4587        if let Some(parent) = dest.parent() {
4588            std::fs::create_dir_all(parent)?;
4589        }
4590        let mut outfile = std::fs::File::create(&dest)?;
4591        std::io::copy(&mut entry, &mut outfile)?;
4592        found = true;
4593    }
4594    if found {
4595        let assets_path = temp.path().join("assets");
4596        Ok(Some((temp, assets_path)))
4597    } else {
4598        Ok(None)
4599    }
4600}
4601
4602fn dist_options_from(component_resolution: &ComponentResolution) -> DistOptions {
4603    let mut opts = DistOptions {
4604        allow_tags: true,
4605        ..DistOptions::default()
4606    };
4607    if let Some(cache_dir) = component_resolution.dist_cache_dir.clone() {
4608        opts.cache_dir = cache_dir;
4609    }
4610    if component_resolution.dist_offline {
4611        opts.offline = true;
4612    }
4613    opts
4614}
4615
4616#[allow(clippy::too_many_arguments)]
4617async fn load_components_from_sources(
4618    cache: &CacheManager,
4619    engine: &Engine,
4620    component_sources: &HashMap<String, ComponentSourceInfo>,
4621    component_resolution: &ComponentResolution,
4622    specs: &[ComponentSpec],
4623    missing: &mut HashSet<String>,
4624    into: &mut HashMap<String, PackComponent>,
4625    materialized_root: Option<&Path>,
4626    archive_hint: Option<&Path>,
4627) -> Result<()> {
4628    let mut archive = if let Some(path) = archive_hint {
4629        Some(
4630            ZipArchive::new(File::open(path)?)
4631                .with_context(|| format!("{} is not a valid gtpack", path.display()))?,
4632        )
4633    } else {
4634        None
4635    };
4636    let mut dist_client: Option<DistClient> = None;
4637
4638    for spec in specs {
4639        if !missing.contains(&spec.id) {
4640            continue;
4641        }
4642        let Some(source) = component_sources.get(&spec.id) else {
4643            continue;
4644        };
4645
4646        let bytes = match &source.artifact {
4647            ComponentArtifactLocation::Inline { wasm_path } => {
4648                if let Some(root) = materialized_root {
4649                    let path = root.join(wasm_path);
4650                    if path.exists() {
4651                        std::fs::read(&path).with_context(|| {
4652                            format!(
4653                                "failed to read inline component {} from {}",
4654                                spec.id,
4655                                path.display()
4656                            )
4657                        })?
4658                    } else if archive.is_none() {
4659                        bail!("inline component {} missing at {}", spec.id, path.display());
4660                    } else {
4661                        read_entry(
4662                            archive.as_mut().expect("archive present when needed"),
4663                            wasm_path,
4664                        )
4665                        .with_context(|| {
4666                            format!(
4667                                "inline component {} missing at {} in pack archive",
4668                                spec.id, wasm_path
4669                            )
4670                        })?
4671                    }
4672                } else if let Some(archive) = archive.as_mut() {
4673                    read_entry(archive, wasm_path).with_context(|| {
4674                        format!(
4675                            "inline component {} missing at {} in pack archive",
4676                            spec.id, wasm_path
4677                        )
4678                    })?
4679                } else {
4680                    bail!(
4681                        "inline component {} missing and no pack source available",
4682                        spec.id
4683                    );
4684                }
4685            }
4686            ComponentArtifactLocation::Remote => {
4687                if source.source.is_tag() {
4688                    bail!(
4689                        "component {} uses tag ref {} but is not bundled; rebuild the pack",
4690                        spec.id,
4691                        source.source
4692                    );
4693                }
4694                let client = dist_client.get_or_insert_with(|| {
4695                    DistClient::new(dist_options_from(component_resolution))
4696                });
4697                let reference = source.source.to_string();
4698                fault::maybe_fail_asset(&reference)
4699                    .await
4700                    .with_context(|| format!("fault injection blocked asset {reference}"))?;
4701                let digest = source.digest.as_deref().ok_or_else(|| {
4702                    anyhow!(
4703                        "component {} missing expected digest for remote component",
4704                        spec.id
4705                    )
4706                })?;
4707                let cache_path = if let Ok(cache_path) = client.fetch_digest(digest).await {
4708                    cache_path
4709                } else if component_resolution.dist_offline {
4710                    client
4711                        .fetch_digest(digest)
4712                        .await
4713                        .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?
4714                } else {
4715                    let source = client
4716                        .parse_source(&reference)
4717                        .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
4718                    let descriptor = client
4719                        .resolve(source, ResolvePolicy)
4720                        .await
4721                        .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
4722                    let resolved = client
4723                        .fetch(&descriptor, CachePolicy)
4724                        .await
4725                        .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
4726                    let expected = normalize_digest(digest);
4727                    let actual = normalize_digest(&resolved.digest);
4728                    if expected != actual {
4729                        bail!(
4730                            "component {} digest mismatch after fetch: expected {}, got {}",
4731                            spec.id,
4732                            expected,
4733                            actual
4734                        );
4735                    }
4736                    resolved.cache_path.ok_or_else(|| {
4737                        anyhow!(
4738                            "component {} resolved from {} but cache path is missing",
4739                            spec.id,
4740                            reference
4741                        )
4742                    })?
4743                };
4744                std::fs::read(&cache_path).with_context(|| {
4745                    format!(
4746                        "failed to read cached component {} from {}",
4747                        spec.id,
4748                        cache_path.display()
4749                    )
4750                })?
4751            }
4752        };
4753
4754        if let Some(expected) = source.expected_wasm_sha256.as_deref() {
4755            verify_wasm_sha256(&spec.id, expected, &bytes)?;
4756        } else if source.skip_digest_verification {
4757            let actual = compute_sha256_digest_for(&bytes);
4758            warn!(
4759                component_id = %spec.id,
4760                digest = %actual,
4761                "bundled component missing wasm_sha256; allowing due to flag"
4762            );
4763        } else {
4764            let expected = source.digest.as_deref().ok_or_else(|| {
4765                anyhow!(
4766                    "component {} missing expected digest for verification",
4767                    spec.id
4768                )
4769            })?;
4770            verify_component_digest(&spec.id, expected, &bytes)?;
4771        }
4772        let component =
4773            compile_component_with_cache(cache, engine, source.digest.as_deref(), bytes)
4774                .await
4775                .with_context(|| format!("failed to compile component {}", spec.id))?;
4776        into.insert(
4777            spec.id.clone(),
4778            PackComponent {
4779                name: spec.id.clone(),
4780                version: spec.version.clone(),
4781                component,
4782            },
4783        );
4784        missing.remove(&spec.id);
4785    }
4786
4787    Ok(())
4788}
4789
4790fn dist_error_for_component(err: DistError, component_id: &str, reference: &str) -> anyhow::Error {
4791    match err {
4792        DistError::NotFound { reference: missing } => anyhow!(
4793            "remote component {} is not cached for {}. Run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
4794            component_id,
4795            missing,
4796            reference
4797        ),
4798        DistError::Offline { reference: blocked } => anyhow!(
4799            "offline mode blocked fetching component {} from {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
4800            component_id,
4801            blocked,
4802            reference
4803        ),
4804        DistError::Unauthorized { target } => anyhow!(
4805            "component {} requires authenticated source {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
4806            component_id,
4807            target,
4808            reference
4809        ),
4810        other => anyhow!(
4811            "failed to resolve component {} from {}: {}",
4812            component_id,
4813            reference,
4814            other
4815        ),
4816    }
4817}
4818
4819async fn load_components_from_overrides(
4820    cache: &CacheManager,
4821    engine: &Engine,
4822    overrides: &HashMap<String, PathBuf>,
4823    specs: &[ComponentSpec],
4824    missing: &mut HashSet<String>,
4825    into: &mut HashMap<String, PackComponent>,
4826) -> Result<()> {
4827    for spec in specs {
4828        if !missing.contains(&spec.id) {
4829            continue;
4830        }
4831        let Some(path) = overrides.get(&spec.id) else {
4832            continue;
4833        };
4834        let bytes = std::fs::read(path)
4835            .with_context(|| format!("failed to read override component {}", path.display()))?;
4836        let component = compile_component_with_cache(cache, engine, None, bytes)
4837            .await
4838            .with_context(|| {
4839                format!(
4840                    "failed to compile component {} from override {}",
4841                    spec.id,
4842                    path.display()
4843                )
4844            })?;
4845        into.insert(
4846            spec.id.clone(),
4847            PackComponent {
4848                name: spec.id.clone(),
4849                version: spec.version.clone(),
4850                component,
4851            },
4852        );
4853        missing.remove(&spec.id);
4854    }
4855    Ok(())
4856}
4857
4858async fn load_components_from_dir(
4859    cache: &CacheManager,
4860    engine: &Engine,
4861    root: &Path,
4862    specs: &[ComponentSpec],
4863    missing: &mut HashSet<String>,
4864    into: &mut HashMap<String, PackComponent>,
4865) -> Result<()> {
4866    for spec in specs {
4867        if !missing.contains(&spec.id) {
4868            continue;
4869        }
4870        let path = component_path_for_spec(root, spec);
4871        if !path.exists() {
4872            tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
4873            continue;
4874        }
4875        let bytes = std::fs::read(&path)
4876            .with_context(|| format!("failed to read component {}", path.display()))?;
4877        let component = compile_component_with_cache(cache, engine, None, bytes)
4878            .await
4879            .with_context(|| {
4880                format!(
4881                    "failed to compile component {} from {}",
4882                    spec.id,
4883                    path.display()
4884                )
4885            })?;
4886        into.insert(
4887            spec.id.clone(),
4888            PackComponent {
4889                name: spec.id.clone(),
4890                version: spec.version.clone(),
4891                component,
4892            },
4893        );
4894        missing.remove(&spec.id);
4895    }
4896    Ok(())
4897}
4898
4899async fn load_components_from_archive(
4900    cache: &CacheManager,
4901    engine: &Engine,
4902    path: &Path,
4903    specs: &[ComponentSpec],
4904    missing: &mut HashSet<String>,
4905    into: &mut HashMap<String, PackComponent>,
4906) -> Result<()> {
4907    let mut archive = ZipArchive::new(File::open(path)?)
4908        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
4909    for spec in specs {
4910        if !missing.contains(&spec.id) {
4911            continue;
4912        }
4913        let file_name = spec
4914            .legacy_path
4915            .clone()
4916            .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
4917        let bytes = match read_entry(&mut archive, &file_name) {
4918            Ok(bytes) => bytes,
4919            Err(err) => {
4920                warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
4921                continue;
4922            }
4923        };
4924        let component = compile_component_with_cache(cache, engine, None, bytes)
4925            .await
4926            .with_context(|| format!("failed to compile component {}", spec.id))?;
4927        into.insert(
4928            spec.id.clone(),
4929            PackComponent {
4930                name: spec.id.clone(),
4931                version: spec.version.clone(),
4932                component,
4933            },
4934        );
4935        missing.remove(&spec.id);
4936    }
4937    Ok(())
4938}
4939
4940#[cfg(test)]
4941mod tests {
4942    use super::*;
4943    use greentic_flow::model::{FlowDoc, NodeDoc};
4944    use indexmap::IndexMap;
4945    use serde_json::json;
4946
4947    #[test]
4948    fn normalizes_raw_component_to_component_exec() {
4949        let mut nodes = IndexMap::new();
4950        let mut raw = IndexMap::new();
4951        raw.insert(
4952            "templating.handlebars".into(),
4953            json!({ "template": "Hi {{name}}" }),
4954        );
4955        nodes.insert(
4956            "start".into(),
4957            NodeDoc {
4958                raw,
4959                routing: json!([{"out": true}]),
4960                ..Default::default()
4961            },
4962        );
4963        let doc = FlowDoc {
4964            id: "welcome".into(),
4965            title: None,
4966            description: None,
4967            flow_type: "messaging".into(),
4968            start: Some("start".into()),
4969            parameters: json!({}),
4970            tags: Vec::new(),
4971            schema_version: None,
4972            entrypoints: IndexMap::new(),
4973            meta: None,
4974            slot_schema: None,
4975            nodes,
4976        };
4977
4978        let normalized = normalize_flow_doc(doc);
4979        let node = normalized.nodes.get("start").expect("node exists");
4980        assert_eq!(node.operation.as_deref(), Some("component.exec"));
4981        assert!(node.raw.is_empty());
4982        let payload = node.payload.as_object().expect("payload object");
4983        assert_eq!(
4984            payload.get("component"),
4985            Some(&Value::String("templating.handlebars".into()))
4986        );
4987        assert_eq!(
4988            payload.get("operation"),
4989            Some(&Value::String("render".into()))
4990        );
4991        let input = payload.get("input").unwrap();
4992        assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
4993    }
4994
4995    #[test]
4996    fn normalizes_canonical_operation_node_to_component_exec_with_config() {
4997        let mut nodes = IndexMap::new();
4998        let mut raw = IndexMap::new();
4999        raw.insert(
5000            "handle_message".into(),
5001            json!({
5002                "component": "oci://ghcr.io/greenticai/component/component-llm-openai:stable",
5003                "config": {
5004                    "provider": "ollama",
5005                    "base_url": "http://127.0.0.1:11434/v1",
5006                    "default_model": "llama3.2"
5007                },
5008                "input": {
5009                    "messages": [{
5010                        "role": "user",
5011                        "content": "Say hello from Ollama."
5012                    }]
5013                }
5014            }),
5015        );
5016        nodes.insert(
5017            "llm".into(),
5018            NodeDoc {
5019                raw,
5020                routing: json!([{"out": true}]),
5021                ..Default::default()
5022            },
5023        );
5024        let doc = FlowDoc {
5025            id: "ollama-repro".into(),
5026            title: None,
5027            description: None,
5028            flow_type: "messaging".into(),
5029            start: Some("llm".into()),
5030            parameters: json!({}),
5031            tags: Vec::new(),
5032            schema_version: None,
5033            entrypoints: IndexMap::new(),
5034            meta: None,
5035            slot_schema: None,
5036            nodes,
5037        };
5038
5039        let normalized = normalize_flow_doc(doc);
5040        let node = normalized.nodes.get("llm").expect("node exists");
5041        assert_eq!(node.operation.as_deref(), Some("component.exec"));
5042        assert!(node.raw.is_empty());
5043        let payload = node.payload.as_object().expect("payload object");
5044        assert_eq!(
5045            payload.get("component"),
5046            Some(&Value::String(
5047                "oci://ghcr.io/greenticai/component/component-llm-openai:stable".into()
5048            ))
5049        );
5050        assert_eq!(
5051            payload.get("operation"),
5052            Some(&Value::String("handle_message".into()))
5053        );
5054        assert_eq!(
5055            payload.get("config"),
5056            Some(&json!({
5057                "provider": "ollama",
5058                "base_url": "http://127.0.0.1:11434/v1",
5059                "default_model": "llama3.2"
5060            }))
5061        );
5062        assert_eq!(
5063            payload.get("input"),
5064            Some(&json!({
5065                "messages": [{
5066                    "role": "user",
5067                    "content": "Say hello from Ollama."
5068                }]
5069            }))
5070        );
5071    }
5072
5073    #[test]
5074    fn missing_export_error_detection_recognises_bindgen_shapes() {
5075        // Positive: identity-world missing-instance error
5076        assert!(is_missing_export_error(
5077            "instantiation: no exported instance named \
5078             `greentic:provider-instance-identity/instance-identity-api@0.1.0`"
5079        ));
5080        // Positive: identity-world missing-function error
5081        assert!(is_missing_export_error(
5082            "instantiation: no exported function named `identify-instance`"
5083        ));
5084        // Negative: unrelated trap
5085        assert!(!is_missing_export_error(
5086            "Wasm trap: out of bounds memory access"
5087        ));
5088        // Negative: a DIFFERENT world's missing export must NOT match —
5089        // e.g. schema-core missing is a hard error, not "unsupported"
5090        assert!(!is_missing_export_error(
5091            "instantiation: no exported instance named \
5092             `greentic:provider-schema-core/schema-core-api@1.0.0`"
5093        ));
5094        // Negative: broad marker present but for a non-identity function
5095        assert!(!is_missing_export_error(
5096            "instantiation: no exported function named `invoke`"
5097        ));
5098    }
5099
5100    #[test]
5101    fn identify_outcome_merge_in_follows_lattice() {
5102        let unsupported = || IdentifyOutcome::Unsupported;
5103        let no_match = || IdentifyOutcome::NoMatch;
5104        let id_a = || IdentifyOutcome::Identified("a".to_string());
5105        let id_b = || IdentifyOutcome::Identified("b".to_string());
5106
5107        // Unsupported is the floor — every other variant promotes it.
5108        let mut x = unsupported();
5109        x.merge_in(unsupported());
5110        assert_eq!(x, unsupported());
5111        let mut x = unsupported();
5112        x.merge_in(no_match());
5113        assert_eq!(x, no_match());
5114        let mut x = unsupported();
5115        x.merge_in(id_a());
5116        assert_eq!(x, id_a());
5117
5118        // NoMatch beats Unsupported but is overridable by Identified.
5119        let mut x = no_match();
5120        x.merge_in(unsupported());
5121        assert_eq!(x, no_match(), "NoMatch must not downgrade to Unsupported");
5122        let mut x = no_match();
5123        x.merge_in(no_match());
5124        assert_eq!(x, no_match());
5125        let mut x = no_match();
5126        x.merge_in(id_a());
5127        assert_eq!(x, id_a(), "Identified must override NoMatch");
5128
5129        // Identified is the top — nothing overwrites it (first id wins).
5130        let mut x = id_a();
5131        x.merge_in(unsupported());
5132        assert_eq!(x, id_a());
5133        let mut x = id_a();
5134        x.merge_in(no_match());
5135        assert_eq!(x, id_a());
5136        let mut x = id_a();
5137        x.merge_in(id_b());
5138        assert_eq!(
5139            x,
5140            id_a(),
5141            "first Identified wins; later id does not replace"
5142        );
5143    }
5144}
5145
5146#[cfg(test)]
5147mod identify_endpoints_pack_tests {
5148    use super::*;
5149    use crate::config::{
5150        FlowRetryConfig, HostConfig, OperatorPolicy, RateLimits, SecretsPolicy, StateStorePolicy,
5151        WebhookPolicy,
5152    };
5153    use crate::trace::TraceConfig;
5154    use crate::validate::ValidationConfig;
5155
5156    fn test_host_config() -> HostConfig {
5157        HostConfig {
5158            tenant: "test".to_string(),
5159            bindings_path: PathBuf::from("/tmp/bindings.yaml"),
5160            flow_type_bindings: HashMap::new(),
5161            rate_limits: RateLimits::default(),
5162            retry: FlowRetryConfig::default(),
5163            http_enabled: false,
5164            secrets_policy: SecretsPolicy::allow_all(),
5165            state_store_policy: StateStorePolicy::default(),
5166            webhook_policy: WebhookPolicy::default(),
5167            timers: Vec::new(),
5168            oauth: None,
5169            mocks: None,
5170            pack_bindings: Vec::new(),
5171            env_passthrough: Vec::new(),
5172            trace: TraceConfig::from_env(),
5173            validation: ValidationConfig::from_env(),
5174            operator_policy: OperatorPolicy::allow_all(),
5175            fast2flow: Default::default(),
5176        }
5177    }
5178
5179    #[tokio::test]
5180    async fn no_manifest_returns_unsupported_for_all_types() {
5181        // A PackRuntime with manifest: None (e.g. legacy single-component
5182        // packs or the for_component_test constructor) has no provider
5183        // registry. Every requested type must map to Unsupported — NOT
5184        // NoMatch — so the caller knows it can fall back to the static
5185        // provider_id rather than failing closed.
5186        let pack = PackRuntime::for_component_test(
5187            Vec::new(),
5188            HashMap::new(),
5189            "test-pack",
5190            Arc::new(test_host_config()),
5191        )
5192        .expect("empty pack construction");
5193        let result = pack
5194            .identify_endpoints_by_provider_type(&["teams", "slack", "telegram"], b"{}")
5195            .await
5196            .expect("no-manifest path must succeed");
5197        assert_eq!(result.len(), 3);
5198        for ty in &["teams", "slack", "telegram"] {
5199            assert_eq!(
5200                result.get(*ty),
5201                Some(&IdentifyOutcome::Unsupported),
5202                "type '{ty}' must be Unsupported when pack has no manifest"
5203            );
5204        }
5205    }
5206
5207    #[tokio::test]
5208    async fn empty_provider_types_returns_empty_map() {
5209        let pack = PackRuntime::for_component_test(
5210            Vec::new(),
5211            HashMap::new(),
5212            "test-pack",
5213            Arc::new(test_host_config()),
5214        )
5215        .expect("empty pack construction");
5216        let result = pack
5217            .identify_endpoints_by_provider_type(&[], b"{}")
5218            .await
5219            .expect("empty types fast path");
5220        assert!(result.is_empty());
5221    }
5222}
5223
5224#[derive(Clone, Debug, Default, Serialize, Deserialize)]
5225pub struct PackMetadata {
5226    pub pack_id: String,
5227    pub version: String,
5228    #[serde(default)]
5229    pub entry_flows: Vec<String>,
5230    #[serde(default)]
5231    pub secret_requirements: Vec<greentic_types::SecretRequirement>,
5232}
5233
5234impl PackMetadata {
5235    fn from_wasm(bytes: &[u8]) -> Option<Self> {
5236        let parser = Parser::new(0);
5237        for payload in parser.parse_all(bytes) {
5238            let payload = payload.ok()?;
5239            match payload {
5240                Payload::CustomSection(section) => {
5241                    if section.name() == "greentic.manifest"
5242                        && let Ok(meta) = Self::from_bytes(section.data())
5243                    {
5244                        return Some(meta);
5245                    }
5246                }
5247                Payload::DataSection(reader) => {
5248                    for segment in reader.into_iter().flatten() {
5249                        if let Ok(meta) = Self::from_bytes(segment.data) {
5250                            return Some(meta);
5251                        }
5252                    }
5253                }
5254                _ => {}
5255            }
5256        }
5257        None
5258    }
5259
5260    fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
5261        #[derive(Deserialize)]
5262        struct RawManifest {
5263            pack_id: String,
5264            version: String,
5265            #[serde(default)]
5266            entry_flows: Vec<String>,
5267            #[serde(default)]
5268            flows: Vec<RawFlow>,
5269            #[serde(default)]
5270            secret_requirements: Vec<greentic_types::SecretRequirement>,
5271        }
5272
5273        #[derive(Deserialize)]
5274        struct RawFlow {
5275            id: String,
5276        }
5277
5278        let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
5279        let mut entry_flows = if manifest.entry_flows.is_empty() {
5280            manifest.flows.iter().map(|f| f.id.clone()).collect()
5281        } else {
5282            manifest.entry_flows.clone()
5283        };
5284        entry_flows.retain(|id| !id.is_empty());
5285        Ok(Self {
5286            pack_id: manifest.pack_id,
5287            version: manifest.version,
5288            entry_flows,
5289            secret_requirements: manifest.secret_requirements,
5290        })
5291    }
5292
5293    pub fn fallback(path: &Path) -> Self {
5294        let pack_id = path
5295            .file_stem()
5296            .map(|s| s.to_string_lossy().into_owned())
5297            .unwrap_or_else(|| "unknown-pack".to_string());
5298        Self {
5299            pack_id,
5300            version: "0.0.0".to_string(),
5301            entry_flows: Vec::new(),
5302            secret_requirements: Vec::new(),
5303        }
5304    }
5305
5306    pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
5307        let entry_flows = manifest
5308            .flows
5309            .iter()
5310            .map(|flow| flow.id.as_str().to_string())
5311            .collect::<Vec<_>>();
5312        Self {
5313            pack_id: manifest.pack_id.as_str().to_string(),
5314            version: manifest.version.to_string(),
5315            entry_flows,
5316            secret_requirements: manifest.secret_requirements.clone(),
5317        }
5318    }
5319}