Skip to main content

greentic_runner_host/
pack.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::cache::{ArtifactKey, CacheConfig, CacheManager, CpuPolicy, EngineProfile};
10use crate::component_api::{
11    self, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::provider::{ProviderBinding, ProviderRegistry};
15use crate::provider_core::SchemaCorePre as ProviderComponentPre;
16use crate::provider_core_only;
17use crate::runtime_wasmtime::{Component, Engine, InstancePre, Linker, ResourceTable};
18use anyhow::{Context, Result, anyhow, bail};
19use futures::executor::block_on;
20use greentic_distributor_client::dist::{DistClient, DistError, DistOptions};
21use greentic_interfaces_wasmtime::host_helpers::v1::{
22    self as host_v1, HostFns, add_all_v1_to_linker,
23    runner_host_http::RunnerHostHttp,
24    runner_host_kv::RunnerHostKv,
25    secrets_store::{SecretsError, SecretsErrorV1_1, SecretsStoreHost, SecretsStoreHostV1_1},
26    state_store::{
27        OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
28        StateStoreHost, TenantCtx as StateTenantCtx,
29    },
30    telemetry_logger::{
31        OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
32        TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
33        TenantCtx as TelemetryTenantCtx,
34    },
35};
36use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
37use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
38use greentic_pack::builder as legacy_pack;
39use greentic_types::flow::FlowHasher;
40use greentic_types::{
41    ArtifactLocationV1, ComponentId, ComponentManifest, ComponentSourceRef, ComponentSourcesV1,
42    EXT_COMPONENT_SOURCES_V1, EnvId, ExtensionRef, Flow, FlowComponentRef, FlowId, FlowKind,
43    FlowMetadata, InputMapping, Node, NodeId, OutputMapping, Routing, StateKey as StoreStateKey,
44    TeamId, TelemetryHints, TenantCtx as TypesTenantCtx, TenantId, UserId, decode_pack_manifest,
45    pack_manifest::ExtensionInline,
46};
47use host_v1::http_client::{
48    HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
49    Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
50    RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
51    TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
52};
53use indexmap::IndexMap;
54use once_cell::sync::Lazy;
55use parking_lot::{Mutex, RwLock};
56use reqwest::blocking::Client as BlockingClient;
57use runner_core::normalize_under_root;
58use serde::{Deserialize, Serialize};
59use serde_cbor;
60use serde_json::{self, Value};
61use sha2::Digest;
62use tempfile::TempDir;
63use tokio::fs;
64use wasmparser::{Parser, Payload};
65use wasmtime::{Store, StoreContextMut};
66use zip::ZipArchive;
67
68use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
69use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
70use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
71#[cfg(feature = "fault-injection")]
72use crate::testing::fault_injection::{FaultContext, FaultPoint, maybe_fail};
73
74use crate::config::HostConfig;
75use crate::fault;
76use crate::secrets::{DynSecretsManager, read_secret_blocking, write_secret_blocking};
77use crate::storage::state::STATE_PREFIX;
78use crate::storage::{DynSessionStore, DynStateStore};
79use crate::verify;
80use crate::wasi::{PreopenSpec, RunnerWasiPolicy};
81use tracing::warn;
82use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
83use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
84
85use greentic_flow::model::FlowDoc;
86
87#[allow(dead_code)]
88pub struct PackRuntime {
89    /// Component artifact path (wasm file).
90    path: PathBuf,
91    /// Optional archive (.gtpack) used to load flows/manifests.
92    archive_path: Option<PathBuf>,
93    config: Arc<HostConfig>,
94    engine: Engine,
95    metadata: PackMetadata,
96    manifest: Option<greentic_types::PackManifest>,
97    legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
98    component_manifests: HashMap<String, ComponentManifest>,
99    mocks: Option<Arc<MockLayer>>,
100    flows: Option<PackFlows>,
101    components: HashMap<String, PackComponent>,
102    http_client: Arc<BlockingClient>,
103    pre_cache: Mutex<HashMap<String, InstancePre<ComponentState>>>,
104    session_store: Option<DynSessionStore>,
105    state_store: Option<DynStateStore>,
106    wasi_policy: Arc<RunnerWasiPolicy>,
107    assets_tempdir: Option<TempDir>,
108    provider_registry: RwLock<Option<ProviderRegistry>>,
109    secrets: DynSecretsManager,
110    oauth_config: Option<OAuthBrokerConfig>,
111    cache: CacheManager,
112}
113
114struct PackComponent {
115    #[allow(dead_code)]
116    name: String,
117    #[allow(dead_code)]
118    version: String,
119    component: Arc<Component>,
120}
121
122fn run_on_wasi_thread<F, T>(task_name: &'static str, task: F) -> Result<T>
123where
124    F: FnOnce() -> Result<T> + Send + 'static,
125    T: Send + 'static,
126{
127    let builder = std::thread::Builder::new().name(format!("greentic-wasmtime-{task_name}"));
128    let handle = builder
129        .spawn(move || {
130            let pid = std::process::id();
131            let thread_id = std::thread::current().id();
132            let tokio_handle_present = tokio::runtime::Handle::try_current().is_ok();
133            tracing::info!(
134                event = "wasmtime.thread.start",
135                task = task_name,
136                pid,
137                thread_id = ?thread_id,
138                tokio_handle_present,
139                "starting Wasmtime thread"
140            );
141            task()
142        })
143        .context("failed to spawn Wasmtime thread")?;
144    handle
145        .join()
146        .map_err(|err| {
147            let reason = if let Some(msg) = err.downcast_ref::<&str>() {
148                msg.to_string()
149            } else if let Some(msg) = err.downcast_ref::<String>() {
150                msg.clone()
151            } else {
152                "unknown panic".to_string()
153            };
154            anyhow!("Wasmtime thread panicked: {reason}")
155        })
156        .and_then(|res| res)
157}
158
159#[derive(Debug, Default, Clone)]
160pub struct ComponentResolution {
161    /// Root of a materialized pack directory containing `manifest.cbor` and `components/`.
162    pub materialized_root: Option<PathBuf>,
163    /// Explicit overrides mapping component id -> wasm path.
164    pub overrides: HashMap<String, PathBuf>,
165    /// If true, do not fetch remote components; require cached artifacts.
166    pub dist_offline: bool,
167    /// Optional cache directory for resolved remote components.
168    pub dist_cache_dir: Option<PathBuf>,
169    /// Allow bundled components without wasm_sha256 (dev-only escape hatch).
170    pub allow_missing_hash: bool,
171}
172
173fn build_blocking_client() -> BlockingClient {
174    std::thread::spawn(|| {
175        BlockingClient::builder()
176            .no_proxy()
177            .build()
178            .expect("blocking client")
179    })
180    .join()
181    .expect("client build thread panicked")
182}
183
184fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
185    let (root, candidate) = if path.is_absolute() {
186        let parent = path
187            .parent()
188            .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
189        let root = parent
190            .canonicalize()
191            .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
192        let file = path
193            .file_name()
194            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
195        (root, PathBuf::from(file))
196    } else {
197        let cwd = std::env::current_dir().context("failed to resolve current directory")?;
198        let base = if let Some(parent) = path.parent() {
199            cwd.join(parent)
200        } else {
201            cwd
202        };
203        let root = base
204            .canonicalize()
205            .with_context(|| format!("failed to canonicalize {}", base.display()))?;
206        let file = path
207            .file_name()
208            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
209        (root, PathBuf::from(file))
210    };
211    let safe = normalize_under_root(&root, &candidate)?;
212    Ok((root, safe))
213}
214
215static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
216
217#[derive(Debug, Clone, Serialize, Deserialize)]
218pub struct FlowDescriptor {
219    pub id: String,
220    #[serde(rename = "type")]
221    pub flow_type: String,
222    pub pack_id: String,
223    pub profile: String,
224    pub version: String,
225    #[serde(default)]
226    pub description: Option<String>,
227}
228
229pub struct HostState {
230    #[allow(dead_code)]
231    pack_id: String,
232    config: Arc<HostConfig>,
233    http_client: Arc<BlockingClient>,
234    default_env: String,
235    #[allow(dead_code)]
236    session_store: Option<DynSessionStore>,
237    state_store: Option<DynStateStore>,
238    mocks: Option<Arc<MockLayer>>,
239    secrets: DynSecretsManager,
240    oauth_config: Option<OAuthBrokerConfig>,
241    oauth_host: OAuthBrokerHost,
242    exec_ctx: Option<ComponentExecCtx>,
243    component_ref: Option<String>,
244    provider_core_component: bool,
245}
246
247impl HostState {
248    #[allow(clippy::default_constructed_unit_structs)]
249    #[allow(clippy::too_many_arguments)]
250    pub fn new(
251        pack_id: String,
252        config: Arc<HostConfig>,
253        http_client: Arc<BlockingClient>,
254        mocks: Option<Arc<MockLayer>>,
255        session_store: Option<DynSessionStore>,
256        state_store: Option<DynStateStore>,
257        secrets: DynSecretsManager,
258        oauth_config: Option<OAuthBrokerConfig>,
259        exec_ctx: Option<ComponentExecCtx>,
260        component_ref: Option<String>,
261        provider_core_component: bool,
262    ) -> Result<Self> {
263        let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
264        Ok(Self {
265            pack_id,
266            config,
267            http_client,
268            default_env,
269            session_store,
270            state_store,
271            mocks,
272            secrets,
273            oauth_config,
274            oauth_host: OAuthBrokerHost::default(),
275            exec_ctx,
276            component_ref,
277            provider_core_component,
278        })
279    }
280
281    fn instantiate_component_result(
282        linker: &mut Linker<ComponentState>,
283        store: &mut Store<ComponentState>,
284        component: &Component,
285        ctx: &ComponentExecCtx,
286        operation: &str,
287        input_json: &str,
288    ) -> Result<InvokeResult> {
289        let pre_instance = linker.instantiate_pre(component)?;
290        match component_api::v0_5::ComponentPre::new(pre_instance) {
291            Ok(pre) => {
292                let result = block_on(async {
293                    let bindings = pre.instantiate_async(&mut *store).await?;
294                    let node = bindings.greentic_component_node();
295                    let ctx_v05 = component_api::exec_ctx_v0_5(ctx);
296                    let operation_owned = operation.to_string();
297                    let input_owned = input_json.to_string();
298                    node.call_invoke(&mut *store, &ctx_v05, &operation_owned, &input_owned)
299                })?;
300                Ok(component_api::invoke_result_from_v0_5(result))
301            }
302            Err(err) => {
303                if is_missing_node_export(&err, "0.5.0") {
304                    let pre_instance = linker.instantiate_pre(component)?;
305                    let pre: component_api::v0_4::ComponentPre<ComponentState> =
306                        component_api::v0_4::ComponentPre::new(pre_instance)?;
307                    let result = block_on(async {
308                        let bindings = pre.instantiate_async(&mut *store).await?;
309                        let node = bindings.greentic_component_node();
310                        let ctx_v04 = component_api::exec_ctx_v0_4(ctx);
311                        let operation_owned = operation.to_string();
312                        let input_owned = input_json.to_string();
313                        node.call_invoke(&mut *store, &ctx_v04, &operation_owned, &input_owned)
314                    })?;
315                    Ok(component_api::invoke_result_from_v0_4(result))
316                } else {
317                    Err(err)
318                }
319            }
320        }
321    }
322
323    fn convert_invoke_result(result: InvokeResult) -> Result<Value> {
324        match result {
325            InvokeResult::Ok(body) => {
326                if body.is_empty() {
327                    return Ok(Value::Null);
328                }
329                serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
330            }
331            InvokeResult::Err(NodeError {
332                code,
333                message,
334                retryable,
335                backoff_ms,
336                details,
337            }) => {
338                let mut obj = serde_json::Map::new();
339                obj.insert("ok".into(), Value::Bool(false));
340                let mut error = serde_json::Map::new();
341                error.insert("code".into(), Value::String(code));
342                error.insert("message".into(), Value::String(message));
343                error.insert("retryable".into(), Value::Bool(retryable));
344                if let Some(backoff) = backoff_ms {
345                    error.insert("backoff_ms".into(), Value::Number(backoff.into()));
346                }
347                if let Some(details) = details {
348                    error.insert(
349                        "details".into(),
350                        serde_json::from_str(&details).unwrap_or(Value::String(details)),
351                    );
352                }
353                obj.insert("error".into(), Value::Object(error));
354                Ok(Value::Object(obj))
355            }
356        }
357    }
358
359    pub fn get_secret(&self, key: &str) -> Result<String> {
360        if provider_core_only::is_enabled() {
361            bail!(provider_core_only::blocked_message("secrets"))
362        }
363        if !self.config.secrets_policy.is_allowed(key) {
364            bail!("secret {key} is not permitted by bindings policy");
365        }
366        if let Some(mock) = &self.mocks
367            && let Some(value) = mock.secrets_lookup(key)
368        {
369            return Ok(value);
370        }
371        let ctx = self.config.tenant_ctx();
372        let bytes = read_secret_blocking(&self.secrets, &ctx, key)
373            .context("failed to read secret from manager")?;
374        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
375        Ok(value)
376    }
377
378    fn allows_secret_write_in_provider_core_only(&self) -> bool {
379        self.provider_core_component || self.component_ref.is_none()
380    }
381
382    fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
383        let tenant_raw = ctx
384            .as_ref()
385            .map(|ctx| ctx.tenant.clone())
386            .or_else(|| self.exec_ctx.as_ref().map(|ctx| ctx.tenant.tenant.clone()))
387            .unwrap_or_else(|| self.config.tenant.clone());
388        let env_raw = ctx
389            .as_ref()
390            .map(|ctx| ctx.env.clone())
391            .unwrap_or_else(|| self.default_env.clone());
392        let tenant_id = TenantId::from_str(&tenant_raw)
393            .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
394        let env_id = EnvId::from_str(&env_raw)
395            .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
396        let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
397        if let Some(exec_ctx) = self.exec_ctx.as_ref() {
398            if let Some(team) = exec_ctx.tenant.team.as_ref() {
399                let team_id =
400                    TeamId::from_str(team).with_context(|| format!("invalid team id `{team}`"))?;
401                tenant_ctx = tenant_ctx.with_team(Some(team_id));
402            }
403            if let Some(user) = exec_ctx.tenant.user.as_ref() {
404                let user_id =
405                    UserId::from_str(user).with_context(|| format!("invalid user id `{user}`"))?;
406                tenant_ctx = tenant_ctx.with_user(Some(user_id));
407            }
408            tenant_ctx = tenant_ctx.with_flow(exec_ctx.flow_id.clone());
409            if let Some(node) = exec_ctx.node_id.as_ref() {
410                tenant_ctx = tenant_ctx.with_node(node.clone());
411            }
412            if let Some(session) = exec_ctx.tenant.correlation_id.as_ref() {
413                tenant_ctx = tenant_ctx.with_session(session.clone());
414            }
415            tenant_ctx.trace_id = exec_ctx.tenant.trace_id.clone();
416        }
417
418        if let Some(ctx) = ctx {
419            if let Some(team) = ctx.team.or(ctx.team_id) {
420                let team_id =
421                    TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
422                tenant_ctx = tenant_ctx.with_team(Some(team_id));
423            }
424            if let Some(user) = ctx.user.or(ctx.user_id) {
425                let user_id =
426                    UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
427                tenant_ctx = tenant_ctx.with_user(Some(user_id));
428            }
429            if let Some(flow) = ctx.flow_id {
430                tenant_ctx = tenant_ctx.with_flow(flow);
431            }
432            if let Some(node) = ctx.node_id {
433                tenant_ctx = tenant_ctx.with_node(node);
434            }
435            if let Some(provider) = ctx.provider_id {
436                tenant_ctx = tenant_ctx.with_provider(provider);
437            }
438            if let Some(session) = ctx.session_id {
439                tenant_ctx = tenant_ctx.with_session(session);
440            }
441            tenant_ctx.trace_id = ctx.trace_id;
442        }
443        Ok(tenant_ctx)
444    }
445
446    fn send_http_request(
447        &mut self,
448        req: HttpRequest,
449        opts: Option<HttpRequestOptionsV1_1>,
450        _ctx: Option<HttpTenantCtx>,
451    ) -> Result<HttpResponse, HttpClientError> {
452        if !self.config.http_enabled {
453            return Err(HttpClientError {
454                code: "denied".into(),
455                message: "http client disabled by policy".into(),
456            });
457        }
458
459        let mut mock_state = None;
460        let raw_body = req.body.clone();
461        if let Some(mock) = &self.mocks
462            && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
463        {
464            match mock.http_begin(&meta) {
465                HttpDecision::Mock(response) => {
466                    let headers = response
467                        .headers
468                        .iter()
469                        .map(|(k, v)| (k.clone(), v.clone()))
470                        .collect();
471                    return Ok(HttpResponse {
472                        status: response.status,
473                        headers,
474                        body: response.body.clone().map(|b| b.into_bytes()),
475                    });
476                }
477                HttpDecision::Deny(reason) => {
478                    return Err(HttpClientError {
479                        code: "denied".into(),
480                        message: reason,
481                    });
482                }
483                HttpDecision::Passthrough { record } => {
484                    mock_state = Some((meta, record));
485                }
486            }
487        }
488
489        let method = req.method.parse().unwrap_or(reqwest::Method::GET);
490        let mut builder = self.http_client.request(method, &req.url);
491        for (key, value) in req.headers {
492            if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
493                && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
494            {
495                builder = builder.header(header, header_value);
496            }
497        }
498
499        if let Some(body) = raw_body.clone() {
500            builder = builder.body(body);
501        }
502
503        if let Some(opts) = opts {
504            if let Some(timeout_ms) = opts.timeout_ms {
505                builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
506            }
507            if opts.allow_insecure == Some(true) {
508                warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
509            }
510            if let Some(follow_redirects) = opts.follow_redirects
511                && !follow_redirects
512            {
513                warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
514            }
515        }
516
517        let response = match builder.send() {
518            Ok(resp) => resp,
519            Err(err) => {
520                warn!(url = %req.url, error = %err, "http client request failed");
521                return Err(HttpClientError {
522                    code: "unavailable".into(),
523                    message: err.to_string(),
524                });
525            }
526        };
527
528        let status = response.status().as_u16();
529        let headers_vec = response
530            .headers()
531            .iter()
532            .map(|(k, v)| {
533                (
534                    k.as_str().to_string(),
535                    v.to_str().unwrap_or_default().to_string(),
536                )
537            })
538            .collect::<Vec<_>>();
539        let body_bytes = response.bytes().ok().map(|b| b.to_vec());
540
541        if let Some((meta, true)) = mock_state.take()
542            && let Some(mock) = &self.mocks
543        {
544            let recorded = HttpMockResponse::new(
545                status,
546                headers_vec.clone().into_iter().collect(),
547                body_bytes
548                    .as_ref()
549                    .map(|b| String::from_utf8_lossy(b).into_owned()),
550            );
551            mock.http_record(&meta, &recorded);
552        }
553
554        Ok(HttpResponse {
555            status,
556            headers: headers_vec,
557            body: body_bytes,
558        })
559    }
560}
561
562impl SecretsStoreHost for HostState {
563    fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
564        if provider_core_only::is_enabled() {
565            warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
566            return Err(SecretsError::Denied);
567        }
568        if !self.config.secrets_policy.is_allowed(&key) {
569            return Err(SecretsError::Denied);
570        }
571        if let Some(mock) = &self.mocks
572            && let Some(value) = mock.secrets_lookup(&key)
573        {
574            return Ok(Some(value.into_bytes()));
575        }
576        let ctx = self.config.tenant_ctx();
577        match read_secret_blocking(&self.secrets, &ctx, &key) {
578            Ok(bytes) => Ok(Some(bytes)),
579            Err(err) => {
580                warn!(secret = %key, error = %err, "secret lookup failed");
581                Err(SecretsError::NotFound)
582            }
583        }
584    }
585}
586
587impl SecretsStoreHostV1_1 for HostState {
588    fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsErrorV1_1> {
589        if provider_core_only::is_enabled() {
590            warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
591            return Err(SecretsErrorV1_1::Denied);
592        }
593        if !self.config.secrets_policy.is_allowed(&key) {
594            return Err(SecretsErrorV1_1::Denied);
595        }
596        if let Some(mock) = &self.mocks
597            && let Some(value) = mock.secrets_lookup(&key)
598        {
599            return Ok(Some(value.into_bytes()));
600        }
601        let ctx = self.config.tenant_ctx();
602        match read_secret_blocking(&self.secrets, &ctx, &key) {
603            Ok(bytes) => Ok(Some(bytes)),
604            Err(err) => {
605                warn!(secret = %key, error = %err, "secret lookup failed");
606                Err(SecretsErrorV1_1::NotFound)
607            }
608        }
609    }
610
611    fn put(&mut self, key: String, value: Vec<u8>) {
612        if key.trim().is_empty() {
613            warn!(secret = %key, "secret write blocked: empty key");
614            panic!("secret write denied for key {key}: invalid key");
615        }
616        if provider_core_only::is_enabled() && !self.allows_secret_write_in_provider_core_only() {
617            warn!(
618                secret = %key,
619                component = self.component_ref.as_deref().unwrap_or("<pack>"),
620                "provider-core only mode enabled; blocking secrets store write"
621            );
622            panic!("secret write denied for key {key}: provider-core-only mode");
623        }
624        if !self.config.secrets_policy.is_allowed(&key) {
625            warn!(secret = %key, "secret write denied by bindings policy");
626            panic!("secret write denied for key {key}: policy");
627        }
628        let ctx = self.config.tenant_ctx();
629        if let Err(err) = write_secret_blocking(&self.secrets, &ctx, &key, &value) {
630            warn!(secret = %key, error = %err, "secret write failed");
631            panic!("secret write failed for key {key}");
632        }
633    }
634}
635
636impl HttpClientHost for HostState {
637    fn send(
638        &mut self,
639        req: HttpRequest,
640        ctx: Option<HttpTenantCtx>,
641    ) -> Result<HttpResponse, HttpClientError> {
642        self.send_http_request(req, None, ctx)
643    }
644}
645
646impl HttpClientHostV1_1 for HostState {
647    fn send(
648        &mut self,
649        req: HttpRequestV1_1,
650        opts: Option<HttpRequestOptionsV1_1>,
651        ctx: Option<HttpTenantCtxV1_1>,
652    ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
653        let legacy_req = HttpRequest {
654            method: req.method,
655            url: req.url,
656            headers: req.headers,
657            body: req.body,
658        };
659        let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
660            env: ctx.env,
661            tenant: ctx.tenant,
662            tenant_id: ctx.tenant_id,
663            team: ctx.team,
664            team_id: ctx.team_id,
665            user: ctx.user,
666            user_id: ctx.user_id,
667            trace_id: ctx.trace_id,
668            correlation_id: ctx.correlation_id,
669            attributes: ctx.attributes,
670            session_id: ctx.session_id,
671            flow_id: ctx.flow_id,
672            node_id: ctx.node_id,
673            provider_id: ctx.provider_id,
674            deadline_ms: ctx.deadline_ms,
675            attempt: ctx.attempt,
676            idempotency_key: ctx.idempotency_key,
677            impersonation: ctx
678                .impersonation
679                .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
680                    actor_id,
681                    reason,
682                }),
683        });
684
685        self.send_http_request(legacy_req, opts, legacy_ctx)
686            .map(|resp| HttpResponseV1_1 {
687                status: resp.status,
688                headers: resp.headers,
689                body: resp.body,
690            })
691            .map_err(|err| HttpClientErrorV1_1 {
692                code: err.code,
693                message: err.message,
694            })
695    }
696}
697
698impl StateStoreHost for HostState {
699    fn read(
700        &mut self,
701        key: HostStateKey,
702        ctx: Option<StateTenantCtx>,
703    ) -> Result<Vec<u8>, StateError> {
704        let store = match self.state_store.as_ref() {
705            Some(store) => store.clone(),
706            None => {
707                return Err(StateError {
708                    code: "unavailable".into(),
709                    message: "state store not configured".into(),
710                });
711            }
712        };
713        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
714            Ok(ctx) => ctx,
715            Err(err) => {
716                return Err(StateError {
717                    code: "invalid-ctx".into(),
718                    message: err.to_string(),
719                });
720            }
721        };
722        #[cfg(feature = "fault-injection")]
723        {
724            let exec_ctx = self.exec_ctx.as_ref();
725            let flow_id = exec_ctx
726                .map(|ctx| ctx.flow_id.as_str())
727                .unwrap_or("unknown");
728            let node_id = exec_ctx.and_then(|ctx| ctx.node_id.as_deref());
729            let attempt = exec_ctx.map(|ctx| ctx.tenant.attempt).unwrap_or(1);
730            let fault_ctx = FaultContext {
731                pack_id: self.pack_id.as_str(),
732                flow_id,
733                node_id,
734                attempt,
735            };
736            if let Err(err) = maybe_fail(FaultPoint::StateRead, fault_ctx) {
737                return Err(StateError {
738                    code: "internal".into(),
739                    message: err.to_string(),
740                });
741            }
742        }
743        let key = StoreStateKey::from(key);
744        match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
745            Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
746            Ok(None) => Err(StateError {
747                code: "not_found".into(),
748                message: "state key not found".into(),
749            }),
750            Err(err) => Err(StateError {
751                code: "internal".into(),
752                message: err.to_string(),
753            }),
754        }
755    }
756
757    fn write(
758        &mut self,
759        key: HostStateKey,
760        bytes: Vec<u8>,
761        ctx: Option<StateTenantCtx>,
762    ) -> Result<StateOpAck, StateError> {
763        let store = match self.state_store.as_ref() {
764            Some(store) => store.clone(),
765            None => {
766                return Err(StateError {
767                    code: "unavailable".into(),
768                    message: "state store not configured".into(),
769                });
770            }
771        };
772        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
773            Ok(ctx) => ctx,
774            Err(err) => {
775                return Err(StateError {
776                    code: "invalid-ctx".into(),
777                    message: err.to_string(),
778                });
779            }
780        };
781        #[cfg(feature = "fault-injection")]
782        {
783            let exec_ctx = self.exec_ctx.as_ref();
784            let flow_id = exec_ctx
785                .map(|ctx| ctx.flow_id.as_str())
786                .unwrap_or("unknown");
787            let node_id = exec_ctx.and_then(|ctx| ctx.node_id.as_deref());
788            let attempt = exec_ctx.map(|ctx| ctx.tenant.attempt).unwrap_or(1);
789            let fault_ctx = FaultContext {
790                pack_id: self.pack_id.as_str(),
791                flow_id,
792                node_id,
793                attempt,
794            };
795            if let Err(err) = maybe_fail(FaultPoint::StateWrite, fault_ctx) {
796                return Err(StateError {
797                    code: "internal".into(),
798                    message: err.to_string(),
799                });
800            }
801        }
802        let key = StoreStateKey::from(key);
803        let value = serde_json::from_slice(&bytes)
804            .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
805        match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
806            Ok(()) => Ok(StateOpAck::Ok),
807            Err(err) => Err(StateError {
808                code: "internal".into(),
809                message: err.to_string(),
810            }),
811        }
812    }
813
814    fn delete(
815        &mut self,
816        key: HostStateKey,
817        ctx: Option<StateTenantCtx>,
818    ) -> Result<StateOpAck, StateError> {
819        let store = match self.state_store.as_ref() {
820            Some(store) => store.clone(),
821            None => {
822                return Err(StateError {
823                    code: "unavailable".into(),
824                    message: "state store not configured".into(),
825                });
826            }
827        };
828        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
829            Ok(ctx) => ctx,
830            Err(err) => {
831                return Err(StateError {
832                    code: "invalid-ctx".into(),
833                    message: err.to_string(),
834                });
835            }
836        };
837        let key = StoreStateKey::from(key);
838        match store.del(&tenant_ctx, STATE_PREFIX, &key) {
839            Ok(_) => Ok(StateOpAck::Ok),
840            Err(err) => Err(StateError {
841                code: "internal".into(),
842                message: err.to_string(),
843            }),
844        }
845    }
846}
847
848impl TelemetryLoggerHost for HostState {
849    fn log(
850        &mut self,
851        span: TelemetrySpanContext,
852        fields: Vec<(String, String)>,
853        _ctx: Option<TelemetryTenantCtx>,
854    ) -> Result<TelemetryAck, TelemetryError> {
855        if let Some(mock) = &self.mocks
856            && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
857        {
858            return Ok(TelemetryAck::Ok);
859        }
860        let mut map = serde_json::Map::new();
861        for (k, v) in fields {
862            map.insert(k, Value::String(v));
863        }
864        tracing::info!(
865            tenant = %span.tenant,
866            flow_id = %span.flow_id,
867            node = ?span.node_id,
868            provider = %span.provider,
869            fields = %serde_json::Value::Object(map.clone()),
870            "telemetry log from pack"
871        );
872        Ok(TelemetryAck::Ok)
873    }
874}
875
876impl RunnerHostHttp for HostState {
877    fn request(
878        &mut self,
879        method: String,
880        url: String,
881        headers: Vec<String>,
882        body: Option<Vec<u8>>,
883    ) -> Result<Vec<u8>, String> {
884        let req = HttpRequest {
885            method,
886            url,
887            headers: headers
888                .chunks(2)
889                .filter_map(|chunk| {
890                    if chunk.len() == 2 {
891                        Some((chunk[0].clone(), chunk[1].clone()))
892                    } else {
893                        None
894                    }
895                })
896                .collect(),
897            body,
898        };
899        match HttpClientHost::send(self, req, None) {
900            Ok(resp) => Ok(resp.body.unwrap_or_default()),
901            Err(err) => Err(err.message),
902        }
903    }
904}
905
906impl RunnerHostKv for HostState {
907    fn get(&mut self, _ns: String, _key: String) -> Option<String> {
908        None
909    }
910
911    fn put(&mut self, _ns: String, _key: String, _val: String) {}
912}
913
914enum ManifestLoad {
915    New {
916        manifest: Box<greentic_types::PackManifest>,
917        flows: PackFlows,
918    },
919    Legacy {
920        manifest: Box<legacy_pack::PackManifest>,
921        flows: PackFlows,
922    },
923}
924
925fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
926    let mut archive = ZipArchive::new(File::open(path)?)
927        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
928    let bytes = read_entry(&mut archive, "manifest.cbor")
929        .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
930    match decode_pack_manifest(&bytes) {
931        Ok(manifest) => {
932            let cache = PackFlows::from_manifest(manifest.clone());
933            Ok(ManifestLoad::New {
934                manifest: Box::new(manifest),
935                flows: cache,
936            })
937        }
938        Err(err) => {
939            tracing::debug!(
940                error = %err,
941                pack = %path.display(),
942                "decode_pack_manifest failed for archive; falling back to legacy manifest"
943            );
944            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
945                .context("failed to decode legacy pack manifest from manifest.cbor")?;
946            let flows = load_legacy_flows_from_archive(&mut archive, path, &legacy)?;
947            Ok(ManifestLoad::Legacy {
948                manifest: Box::new(legacy),
949                flows,
950            })
951        }
952    }
953}
954
955fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
956    let manifest_path = root.join("manifest.cbor");
957    let bytes = std::fs::read(&manifest_path)
958        .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
959    match decode_pack_manifest(&bytes) {
960        Ok(manifest) => {
961            let cache = PackFlows::from_manifest(manifest.clone());
962            Ok(ManifestLoad::New {
963                manifest: Box::new(manifest),
964                flows: cache,
965            })
966        }
967        Err(err) => {
968            tracing::debug!(
969                error = %err,
970                pack = %root.display(),
971                "decode_pack_manifest failed for materialized pack; trying legacy manifest"
972            );
973            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
974                .context("failed to decode legacy pack manifest from manifest.cbor")?;
975            let flows = load_legacy_flows_from_dir(root, &legacy)?;
976            Ok(ManifestLoad::Legacy {
977                manifest: Box::new(legacy),
978                flows,
979            })
980        }
981    }
982}
983
984fn load_legacy_flows_from_dir(
985    root: &Path,
986    manifest: &legacy_pack::PackManifest,
987) -> Result<PackFlows> {
988    build_legacy_flows(manifest, |rel_path| {
989        let path = root.join(rel_path);
990        std::fs::read(&path).with_context(|| format!("missing flow json {}", path.display()))
991    })
992}
993
994fn load_legacy_flows_from_archive(
995    archive: &mut ZipArchive<File>,
996    pack_path: &Path,
997    manifest: &legacy_pack::PackManifest,
998) -> Result<PackFlows> {
999    build_legacy_flows(manifest, |rel_path| {
1000        read_entry(archive, rel_path)
1001            .with_context(|| format!("missing flow json {} in {}", rel_path, pack_path.display()))
1002    })
1003}
1004
1005fn build_legacy_flows(
1006    manifest: &legacy_pack::PackManifest,
1007    mut read_json: impl FnMut(&str) -> Result<Vec<u8>>,
1008) -> Result<PackFlows> {
1009    let mut flows = HashMap::new();
1010    let mut descriptors = Vec::new();
1011
1012    for entry in &manifest.flows {
1013        let bytes = read_json(&entry.file_json)
1014            .with_context(|| format!("missing flow json {}", entry.file_json))?;
1015        let doc = parse_flow_doc_with_legacy_aliases(&bytes, &entry.file_json)?;
1016        let normalized = normalize_flow_doc(doc);
1017        let flow_ir = flow_doc_to_ir(normalized)?;
1018        let flow = flow_ir_to_flow(flow_ir)?;
1019
1020        descriptors.push(FlowDescriptor {
1021            id: entry.id.clone(),
1022            flow_type: entry.kind.clone(),
1023            pack_id: manifest.meta.pack_id.clone(),
1024            profile: manifest.meta.pack_id.clone(),
1025            version: manifest.meta.version.to_string(),
1026            description: None,
1027        });
1028        flows.insert(entry.id.clone(), flow);
1029    }
1030
1031    let mut entry_flows = manifest.meta.entry_flows.clone();
1032    if entry_flows.is_empty() {
1033        entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
1034    }
1035    let metadata = PackMetadata {
1036        pack_id: manifest.meta.pack_id.clone(),
1037        version: manifest.meta.version.to_string(),
1038        entry_flows,
1039        secret_requirements: Vec::new(),
1040    };
1041
1042    Ok(PackFlows {
1043        descriptors,
1044        flows,
1045        metadata,
1046    })
1047}
1048
1049fn parse_flow_doc_with_legacy_aliases(bytes: &[u8], path: &str) -> Result<FlowDoc> {
1050    let mut value: Value = serde_json::from_slice(bytes)
1051        .with_context(|| format!("failed to decode flow doc {}", path))?;
1052    if let Some(map) = value.as_object_mut()
1053        && !map.contains_key("type")
1054        && let Some(flow_type) = map.remove("flow_type")
1055    {
1056        map.insert("type".to_string(), flow_type);
1057    }
1058    serde_json::from_value(value).with_context(|| format!("failed to decode flow doc {}", path))
1059}
1060
1061pub struct ComponentState {
1062    pub host: HostState,
1063    wasi_ctx: WasiCtx,
1064    resource_table: ResourceTable,
1065}
1066
1067impl ComponentState {
1068    pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
1069        let wasi_ctx = policy
1070            .instantiate()
1071            .context("failed to build WASI context")?;
1072        Ok(Self {
1073            host,
1074            wasi_ctx,
1075            resource_table: ResourceTable::new(),
1076        })
1077    }
1078
1079    fn host_mut(&mut self) -> &mut HostState {
1080        &mut self.host
1081    }
1082
1083    fn should_cancel_host(&mut self) -> bool {
1084        false
1085    }
1086
1087    fn yield_now_host(&mut self) {
1088        // no-op cooperative yield
1089    }
1090}
1091
1092impl component_api::v0_4::greentic::component::control::Host for ComponentState {
1093    fn should_cancel(&mut self) -> bool {
1094        self.should_cancel_host()
1095    }
1096
1097    fn yield_now(&mut self) {
1098        self.yield_now_host();
1099    }
1100}
1101
1102impl component_api::v0_5::greentic::component::control::Host for ComponentState {
1103    fn should_cancel(&mut self) -> bool {
1104        self.should_cancel_host()
1105    }
1106
1107    fn yield_now(&mut self) {
1108        self.yield_now_host();
1109    }
1110}
1111
1112fn add_component_control_instance(
1113    linker: &mut Linker<ComponentState>,
1114    name: &str,
1115) -> wasmtime::Result<()> {
1116    let mut inst = linker.instance(name)?;
1117    inst.func_wrap(
1118        "should-cancel",
1119        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
1120            let host = caller.data_mut();
1121            Ok((host.should_cancel_host(),))
1122        },
1123    )?;
1124    inst.func_wrap(
1125        "yield-now",
1126        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
1127            let host = caller.data_mut();
1128            host.yield_now_host();
1129            Ok(())
1130        },
1131    )?;
1132    Ok(())
1133}
1134
1135fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
1136    add_component_control_instance(linker, "greentic:component/control@0.5.0")?;
1137    add_component_control_instance(linker, "greentic:component/control@0.4.0")?;
1138    Ok(())
1139}
1140
1141pub fn register_all(linker: &mut Linker<ComponentState>, allow_state_store: bool) -> Result<()> {
1142    add_wasi_to_linker(linker)?;
1143    add_all_v1_to_linker(
1144        linker,
1145        HostFns {
1146            http_client_v1_1: Some(|state: &mut ComponentState| state.host_mut()),
1147            http_client: Some(|state: &mut ComponentState| state.host_mut()),
1148            oauth_broker: None,
1149            runner_host_http: Some(|state: &mut ComponentState| state.host_mut()),
1150            runner_host_kv: Some(|state: &mut ComponentState| state.host_mut()),
1151            telemetry_logger: Some(|state: &mut ComponentState| state.host_mut()),
1152            state_store: allow_state_store.then_some(|state: &mut ComponentState| state.host_mut()),
1153            secrets_store_v1_1: Some(|state: &mut ComponentState| state.host_mut()),
1154            secrets_store: None,
1155        },
1156    )?;
1157    Ok(())
1158}
1159
1160impl OAuthHostContext for ComponentState {
1161    fn tenant_id(&self) -> &str {
1162        &self.host.config.tenant
1163    }
1164
1165    fn env(&self) -> &str {
1166        &self.host.default_env
1167    }
1168
1169    fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
1170        &mut self.host.oauth_host
1171    }
1172
1173    fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
1174        self.host.oauth_config.as_ref()
1175    }
1176}
1177
1178impl WasiView for ComponentState {
1179    fn ctx(&mut self) -> WasiCtxView<'_> {
1180        WasiCtxView {
1181            ctx: &mut self.wasi_ctx,
1182            table: &mut self.resource_table,
1183        }
1184    }
1185}
1186
1187#[allow(unsafe_code)]
1188unsafe impl Send for ComponentState {}
1189#[allow(unsafe_code)]
1190unsafe impl Sync for ComponentState {}
1191
1192impl PackRuntime {
1193    fn allows_state_store(&self, component_ref: &str) -> bool {
1194        if self.state_store.is_none() {
1195            return false;
1196        }
1197        if !self.config.state_store_policy.allow {
1198            return false;
1199        }
1200        let Some(manifest) = self.component_manifests.get(component_ref) else {
1201            return false;
1202        };
1203        manifest
1204            .capabilities
1205            .host
1206            .state
1207            .as_ref()
1208            .map(|caps| caps.read || caps.write)
1209            .unwrap_or(false)
1210    }
1211
1212    pub fn contains_component(&self, component_ref: &str) -> bool {
1213        self.components.contains_key(component_ref)
1214    }
1215
1216    #[allow(clippy::too_many_arguments)]
1217    pub async fn load(
1218        path: impl AsRef<Path>,
1219        config: Arc<HostConfig>,
1220        mocks: Option<Arc<MockLayer>>,
1221        archive_source: Option<&Path>,
1222        session_store: Option<DynSessionStore>,
1223        state_store: Option<DynStateStore>,
1224        wasi_policy: Arc<RunnerWasiPolicy>,
1225        secrets: DynSecretsManager,
1226        oauth_config: Option<OAuthBrokerConfig>,
1227        verify_archive: bool,
1228        component_resolution: ComponentResolution,
1229    ) -> Result<Self> {
1230        let path = path.as_ref();
1231        let (_pack_root, safe_path) = normalize_pack_path(path)?;
1232        let path_meta = std::fs::metadata(&safe_path).ok();
1233        let is_dir = path_meta
1234            .as_ref()
1235            .map(|meta| meta.is_dir())
1236            .unwrap_or(false);
1237        let is_component = !is_dir
1238            && safe_path
1239                .extension()
1240                .and_then(|ext| ext.to_str())
1241                .map(|ext| ext.eq_ignore_ascii_case("wasm"))
1242                .unwrap_or(false);
1243        let archive_hint_path = if let Some(source) = archive_source {
1244            let (_, normalized) = normalize_pack_path(source)?;
1245            Some(normalized)
1246        } else if is_component || is_dir {
1247            None
1248        } else {
1249            Some(safe_path.clone())
1250        };
1251        let archive_hint = archive_hint_path.as_deref();
1252        if verify_archive {
1253            if let Some(verify_target) = archive_hint.and_then(|p| {
1254                std::fs::metadata(p)
1255                    .ok()
1256                    .filter(|meta| meta.is_file())
1257                    .map(|_| p)
1258            }) {
1259                verify::verify_pack(verify_target).await?;
1260                tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
1261            } else {
1262                tracing::debug!("skipping archive verification (no archive source)");
1263            }
1264        }
1265        let engine = Engine::default();
1266        let engine_profile =
1267            EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1268        let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1269        let mut metadata = PackMetadata::fallback(&safe_path);
1270        let mut manifest = None;
1271        let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
1272        let mut flows = None;
1273        let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
1274            if is_dir {
1275                Some(safe_path.clone())
1276            } else {
1277                None
1278            }
1279        });
1280        let (pack_assets_dir, assets_tempdir) =
1281            locate_pack_assets(materialized_root.as_deref(), archive_hint)?;
1282        let setup_yaml_exists = pack_assets_dir
1283            .as_ref()
1284            .map(|dir| dir.join("setup.yaml").is_file())
1285            .unwrap_or(false);
1286        tracing::info!(
1287            pack_root = %safe_path.display(),
1288            assets_setup_yaml_exists = setup_yaml_exists,
1289            "pack unpack metadata"
1290        );
1291
1292        if let Some(root) = materialized_root.as_ref() {
1293            match load_manifest_and_flows_from_dir(root) {
1294                Ok(ManifestLoad::New {
1295                    manifest: m,
1296                    flows: cache,
1297                }) => {
1298                    metadata = cache.metadata.clone();
1299                    manifest = Some(*m);
1300                    flows = Some(cache);
1301                }
1302                Ok(ManifestLoad::Legacy {
1303                    manifest: m,
1304                    flows: cache,
1305                }) => {
1306                    metadata = cache.metadata.clone();
1307                    legacy_manifest = Some(m);
1308                    flows = Some(cache);
1309                }
1310                Err(err) => {
1311                    warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
1312                }
1313            }
1314        }
1315
1316        if manifest.is_none()
1317            && legacy_manifest.is_none()
1318            && let Some(archive_path) = archive_hint
1319        {
1320            let manifest_load = load_manifest_and_flows(archive_path).with_context(|| {
1321                format!(
1322                    "failed to load manifest.cbor from {}",
1323                    archive_path.display()
1324                )
1325            })?;
1326            match manifest_load {
1327                ManifestLoad::New {
1328                    manifest: m,
1329                    flows: cache,
1330                } => {
1331                    metadata = cache.metadata.clone();
1332                    manifest = Some(*m);
1333                    flows = Some(cache);
1334                }
1335                ManifestLoad::Legacy {
1336                    manifest: m,
1337                    flows: cache,
1338                } => {
1339                    metadata = cache.metadata.clone();
1340                    legacy_manifest = Some(m);
1341                    flows = Some(cache);
1342                }
1343            }
1344        }
1345        #[cfg(feature = "fault-injection")]
1346        {
1347            let fault_ctx = FaultContext {
1348                pack_id: metadata.pack_id.as_str(),
1349                flow_id: "unknown",
1350                node_id: None,
1351                attempt: 1,
1352            };
1353            maybe_fail(FaultPoint::PackResolve, fault_ctx)
1354                .map_err(|err| anyhow!(err.to_string()))?;
1355        }
1356        let mut pack_lock = None;
1357        for root in find_pack_lock_roots(&safe_path, is_dir, archive_hint) {
1358            pack_lock = load_pack_lock(&root)?;
1359            if pack_lock.is_some() {
1360                break;
1361            }
1362        }
1363        let component_sources_payload = if pack_lock.is_none() {
1364            if let Some(manifest) = manifest.as_ref() {
1365                manifest
1366                    .get_component_sources_v1()
1367                    .context("invalid component sources extension")?
1368            } else {
1369                None
1370            }
1371        } else {
1372            None
1373        };
1374        let component_sources = if let Some(lock) = pack_lock.as_ref() {
1375            Some(component_sources_table_from_pack_lock(
1376                lock,
1377                component_resolution.allow_missing_hash,
1378            )?)
1379        } else {
1380            component_sources_table(component_sources_payload.as_ref())?
1381        };
1382        let components = if is_component {
1383            let wasm_bytes = fs::read(&safe_path).await?;
1384            metadata = PackMetadata::from_wasm(&wasm_bytes)
1385                .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1386            let name = safe_path
1387                .file_stem()
1388                .map(|s| s.to_string_lossy().to_string())
1389                .unwrap_or_else(|| "component".to_string());
1390            let component = compile_component_with_cache(&cache, &engine, None, wasm_bytes).await?;
1391            let mut map = HashMap::new();
1392            map.insert(
1393                name.clone(),
1394                PackComponent {
1395                    name,
1396                    version: metadata.version.clone(),
1397                    component,
1398                },
1399            );
1400            map
1401        } else {
1402            let specs = component_specs(
1403                manifest.as_ref(),
1404                legacy_manifest.as_deref(),
1405                component_sources_payload.as_ref(),
1406                pack_lock.as_ref(),
1407            );
1408            if specs.is_empty() {
1409                HashMap::new()
1410            } else {
1411                let mut loaded = HashMap::new();
1412                let mut missing: HashSet<String> =
1413                    specs.iter().map(|spec| spec.id.clone()).collect();
1414                let mut searched = Vec::new();
1415
1416                if !component_resolution.overrides.is_empty() {
1417                    load_components_from_overrides(
1418                        &cache,
1419                        &engine,
1420                        &component_resolution.overrides,
1421                        &specs,
1422                        &mut missing,
1423                        &mut loaded,
1424                    )
1425                    .await?;
1426                    searched.push("override map".to_string());
1427                }
1428
1429                if let Some(component_sources) = component_sources.as_ref() {
1430                    load_components_from_sources(
1431                        &cache,
1432                        &engine,
1433                        component_sources,
1434                        &component_resolution,
1435                        &specs,
1436                        &mut missing,
1437                        &mut loaded,
1438                        materialized_root.as_deref(),
1439                        archive_hint,
1440                    )
1441                    .await?;
1442                    searched.push(format!("extension {}", EXT_COMPONENT_SOURCES_V1));
1443                }
1444
1445                if let Some(root) = materialized_root.as_ref() {
1446                    load_components_from_dir(
1447                        &cache,
1448                        &engine,
1449                        root,
1450                        &specs,
1451                        &mut missing,
1452                        &mut loaded,
1453                    )
1454                    .await?;
1455                    searched.push(format!("components dir {}", root.display()));
1456                }
1457
1458                if let Some(archive_path) = archive_hint {
1459                    load_components_from_archive(
1460                        &cache,
1461                        &engine,
1462                        archive_path,
1463                        &specs,
1464                        &mut missing,
1465                        &mut loaded,
1466                    )
1467                    .await?;
1468                    searched.push(format!("archive {}", archive_path.display()));
1469                }
1470
1471                if !missing.is_empty() {
1472                    let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
1473                    let sources = if searched.is_empty() {
1474                        "no component sources".to_string()
1475                    } else {
1476                        searched.join(", ")
1477                    };
1478                    bail!(
1479                        "components missing: {}; looked in {}",
1480                        missing_list,
1481                        sources
1482                    );
1483                }
1484
1485                loaded
1486            }
1487        };
1488        let http_client = Arc::clone(&HTTP_CLIENT);
1489        let mut component_manifests = HashMap::new();
1490        if let Some(manifest) = manifest.as_ref() {
1491            for component in &manifest.components {
1492                component_manifests.insert(component.id.as_str().to_string(), component.clone());
1493            }
1494        }
1495        let mut pack_policy = (*wasi_policy).clone();
1496        if let Some(dir) = pack_assets_dir {
1497            tracing::debug!(path = %dir.display(), "preopening pack assets directory for WASI /assets");
1498            pack_policy =
1499                pack_policy.with_preopen(PreopenSpec::new(dir, "/assets").read_only(true));
1500        }
1501        let wasi_policy = Arc::new(pack_policy);
1502        Ok(Self {
1503            path: safe_path,
1504            archive_path: archive_hint.map(Path::to_path_buf),
1505            config,
1506            engine,
1507            metadata,
1508            manifest,
1509            legacy_manifest,
1510            component_manifests,
1511            mocks,
1512            flows,
1513            components,
1514            http_client,
1515            pre_cache: Mutex::new(HashMap::new()),
1516            session_store,
1517            state_store,
1518            wasi_policy,
1519            assets_tempdir,
1520            provider_registry: RwLock::new(None),
1521            secrets,
1522            oauth_config,
1523            cache,
1524        })
1525    }
1526
1527    pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
1528        if let Some(cache) = &self.flows {
1529            return Ok(cache.descriptors.clone());
1530        }
1531        if let Some(manifest) = &self.manifest {
1532            let descriptors = manifest
1533                .flows
1534                .iter()
1535                .map(|flow| FlowDescriptor {
1536                    id: flow.id.as_str().to_string(),
1537                    flow_type: flow_kind_to_str(flow.kind).to_string(),
1538                    pack_id: manifest.pack_id.as_str().to_string(),
1539                    profile: manifest.pack_id.as_str().to_string(),
1540                    version: manifest.version.to_string(),
1541                    description: None,
1542                })
1543                .collect();
1544            return Ok(descriptors);
1545        }
1546        Ok(Vec::new())
1547    }
1548
1549    #[allow(dead_code)]
1550    pub async fn run_flow(
1551        &self,
1552        flow_id: &str,
1553        input: serde_json::Value,
1554    ) -> Result<serde_json::Value> {
1555        let pack = Arc::new(
1556            PackRuntime::load(
1557                &self.path,
1558                Arc::clone(&self.config),
1559                self.mocks.clone(),
1560                self.archive_path.as_deref(),
1561                self.session_store.clone(),
1562                self.state_store.clone(),
1563                Arc::clone(&self.wasi_policy),
1564                self.secrets.clone(),
1565                self.oauth_config.clone(),
1566                false,
1567                ComponentResolution::default(),
1568            )
1569            .await?,
1570        );
1571
1572        let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1573        let retry_config = self.config.retry_config().into();
1574        let mocks = pack.mocks.as_deref();
1575        let tenant = self.config.tenant.as_str();
1576
1577        let ctx = FlowContext {
1578            tenant,
1579            pack_id: pack.metadata().pack_id.as_str(),
1580            flow_id,
1581            node_id: None,
1582            tool: None,
1583            action: None,
1584            session_id: None,
1585            provider_id: None,
1586            retry_config,
1587            attempt: 1,
1588            observer: None,
1589            mocks,
1590        };
1591
1592        let execution = engine.execute(ctx, input).await?;
1593        match execution.status {
1594            FlowStatus::Completed => Ok(execution.output),
1595            FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1596                "status": "pending",
1597                "reason": wait.reason,
1598                "resume": wait.snapshot,
1599                "response": execution.output,
1600            })),
1601        }
1602    }
1603
1604    pub async fn invoke_component(
1605        &self,
1606        component_ref: &str,
1607        ctx: ComponentExecCtx,
1608        operation: &str,
1609        _config_json: Option<String>,
1610        input_json: String,
1611    ) -> Result<Value> {
1612        let pack_component = self
1613            .components
1614            .get(component_ref)
1615            .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1616        let engine = self.engine.clone();
1617        let config = Arc::clone(&self.config);
1618        let http_client = Arc::clone(&self.http_client);
1619        let mocks = self.mocks.clone();
1620        let session_store = self.session_store.clone();
1621        let state_store = self.state_store.clone();
1622        let secrets = Arc::clone(&self.secrets);
1623        let oauth_config = self.oauth_config.clone();
1624        let wasi_policy = Arc::clone(&self.wasi_policy);
1625        let pack_id = self.metadata().pack_id.clone();
1626        let allow_state_store = self.allows_state_store(component_ref);
1627        let component = pack_component.component.clone();
1628        let component_ref_owned = component_ref.to_string();
1629        let operation_owned = operation.to_string();
1630        let input_owned = input_json;
1631        let ctx_owned = ctx;
1632
1633        run_on_wasi_thread("component.invoke", move || {
1634            let mut linker = Linker::new(&engine);
1635            register_all(&mut linker, allow_state_store)?;
1636            add_component_control_to_linker(&mut linker)?;
1637
1638            let host_state = HostState::new(
1639                pack_id.clone(),
1640                config,
1641                http_client,
1642                mocks,
1643                session_store,
1644                state_store,
1645                secrets,
1646                oauth_config,
1647                Some(ctx_owned.clone()),
1648                Some(component_ref_owned.clone()),
1649                false,
1650            )?;
1651            let store_state = ComponentState::new(host_state, wasi_policy)?;
1652            let mut store = wasmtime::Store::new(&engine, store_state);
1653
1654            let invoke_result = HostState::instantiate_component_result(
1655                &mut linker,
1656                &mut store,
1657                &component,
1658                &ctx_owned,
1659                &operation_owned,
1660                &input_owned,
1661            )?;
1662            HostState::convert_invoke_result(invoke_result)
1663        })
1664    }
1665
1666    pub fn resolve_provider(
1667        &self,
1668        provider_id: Option<&str>,
1669        provider_type: Option<&str>,
1670    ) -> Result<ProviderBinding> {
1671        let registry = self.provider_registry()?;
1672        registry.resolve(provider_id, provider_type)
1673    }
1674
1675    pub async fn invoke_provider(
1676        &self,
1677        binding: &ProviderBinding,
1678        ctx: ComponentExecCtx,
1679        op: &str,
1680        input_json: Vec<u8>,
1681    ) -> Result<Value> {
1682        let component_ref_owned = binding.component_ref.clone();
1683        let pack_component = self.components.get(&component_ref_owned).with_context(|| {
1684            format!("provider component '{component_ref_owned}' not found in pack")
1685        })?;
1686        let component = pack_component.component.clone();
1687
1688        let engine = self.engine.clone();
1689        let config = Arc::clone(&self.config);
1690        let http_client = Arc::clone(&self.http_client);
1691        let mocks = self.mocks.clone();
1692        let session_store = self.session_store.clone();
1693        let state_store = self.state_store.clone();
1694        let secrets = Arc::clone(&self.secrets);
1695        let oauth_config = self.oauth_config.clone();
1696        let wasi_policy = Arc::clone(&self.wasi_policy);
1697        let pack_id = self.metadata().pack_id.clone();
1698        let allow_state_store = self.allows_state_store(&component_ref_owned);
1699        let input_owned = input_json;
1700        let op_owned = op.to_string();
1701        let ctx_owned = ctx;
1702
1703        run_on_wasi_thread("provider.invoke", move || {
1704            let mut linker = Linker::new(&engine);
1705            register_all(&mut linker, allow_state_store)?;
1706            add_component_control_to_linker(&mut linker)?;
1707            let pre_instance = linker.instantiate_pre(component.as_ref())?;
1708            let pre: ProviderComponentPre<ComponentState> =
1709                ProviderComponentPre::new(pre_instance)?;
1710
1711            let host_state = HostState::new(
1712                pack_id.clone(),
1713                config,
1714                http_client,
1715                mocks,
1716                session_store,
1717                state_store,
1718                secrets,
1719                oauth_config,
1720                Some(ctx_owned.clone()),
1721                Some(component_ref_owned.clone()),
1722                true,
1723            )?;
1724            let store_state = ComponentState::new(host_state, wasi_policy)?;
1725            let mut store = wasmtime::Store::new(&engine, store_state);
1726            let bindings: crate::provider_core::SchemaCore =
1727                block_on(async { pre.instantiate_async(&mut store).await })?;
1728            let provider = bindings.greentic_provider_core_schema_core_api();
1729
1730            let result = provider.call_invoke(&mut store, &op_owned, &input_owned)?;
1731            deserialize_json_bytes(result)
1732        })
1733    }
1734
1735    pub(crate) fn provider_registry(&self) -> Result<ProviderRegistry> {
1736        if let Some(registry) = self.provider_registry.read().clone() {
1737            return Ok(registry);
1738        }
1739        let manifest = self
1740            .manifest
1741            .as_ref()
1742            .context("pack manifest required for provider resolution")?;
1743        let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1744        let registry = ProviderRegistry::new(
1745            manifest,
1746            self.state_store.clone(),
1747            &self.config.tenant,
1748            &env,
1749        )?;
1750        *self.provider_registry.write() = Some(registry.clone());
1751        Ok(registry)
1752    }
1753
1754    pub(crate) fn provider_registry_optional(&self) -> Result<Option<ProviderRegistry>> {
1755        if self.manifest.is_none() {
1756            return Ok(None);
1757        }
1758        Ok(Some(self.provider_registry()?))
1759    }
1760
1761    pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1762        if let Some(cache) = &self.flows {
1763            return cache
1764                .flows
1765                .get(flow_id)
1766                .cloned()
1767                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1768        }
1769        if let Some(manifest) = &self.manifest {
1770            let entry = manifest
1771                .flows
1772                .iter()
1773                .find(|f| f.id.as_str() == flow_id)
1774                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1775            return Ok(entry.flow.clone());
1776        }
1777        bail!("flow '{flow_id}' not available (pack exports disabled)")
1778    }
1779
1780    pub fn metadata(&self) -> &PackMetadata {
1781        &self.metadata
1782    }
1783
1784    pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1785        &self.metadata.secret_requirements
1786    }
1787
1788    pub fn missing_secrets(
1789        &self,
1790        tenant_ctx: &TypesTenantCtx,
1791    ) -> Vec<greentic_types::SecretRequirement> {
1792        let env = tenant_ctx.env.as_str().to_string();
1793        let tenant = tenant_ctx.tenant.as_str().to_string();
1794        let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1795        self.required_secrets()
1796            .iter()
1797            .filter(|req| {
1798                // scope must match current context if provided
1799                if let Some(scope) = &req.scope {
1800                    if scope.env != env {
1801                        return false;
1802                    }
1803                    if scope.tenant != tenant {
1804                        return false;
1805                    }
1806                    if let Some(ref team_req) = scope.team
1807                        && team.as_ref() != Some(team_req)
1808                    {
1809                        return false;
1810                    }
1811                }
1812                let ctx = self.config.tenant_ctx();
1813                read_secret_blocking(&self.secrets, &ctx, req.key.as_str()).is_err()
1814            })
1815            .cloned()
1816            .collect()
1817    }
1818
1819    pub fn for_component_test(
1820        components: Vec<(String, PathBuf)>,
1821        flows: HashMap<String, FlowIR>,
1822        pack_id: &str,
1823        config: Arc<HostConfig>,
1824    ) -> Result<Self> {
1825        let engine = Engine::default();
1826        let engine_profile =
1827            EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1828        let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1829        let mut component_map = HashMap::new();
1830        for (name, path) in components {
1831            if !path.exists() {
1832                bail!("component artifact missing: {}", path.display());
1833            }
1834            let wasm_bytes = std::fs::read(&path)?;
1835            let component = Arc::new(
1836                Component::from_binary(&engine, &wasm_bytes)
1837                    .with_context(|| format!("failed to compile component {}", path.display()))?,
1838            );
1839            component_map.insert(
1840                name.clone(),
1841                PackComponent {
1842                    name,
1843                    version: "0.0.0".into(),
1844                    component,
1845                },
1846            );
1847        }
1848
1849        let mut flow_map = HashMap::new();
1850        let mut descriptors = Vec::new();
1851        for (id, ir) in flows {
1852            let flow_type = ir.flow_type.clone();
1853            let flow = flow_ir_to_flow(ir)?;
1854            flow_map.insert(id.clone(), flow);
1855            descriptors.push(FlowDescriptor {
1856                id: id.clone(),
1857                flow_type,
1858                pack_id: pack_id.to_string(),
1859                profile: "test".into(),
1860                version: "0.0.0".into(),
1861                description: None,
1862            });
1863        }
1864        let entry_flows = descriptors.iter().map(|flow| flow.id.clone()).collect();
1865        let metadata = PackMetadata {
1866            pack_id: pack_id.to_string(),
1867            version: "0.0.0".into(),
1868            entry_flows,
1869            secret_requirements: Vec::new(),
1870        };
1871        let flows_cache = PackFlows {
1872            descriptors: descriptors.clone(),
1873            flows: flow_map,
1874            metadata: metadata.clone(),
1875        };
1876
1877        Ok(Self {
1878            path: PathBuf::new(),
1879            archive_path: None,
1880            config,
1881            engine,
1882            metadata,
1883            manifest: None,
1884            legacy_manifest: None,
1885            component_manifests: HashMap::new(),
1886            mocks: None,
1887            flows: Some(flows_cache),
1888            components: component_map,
1889            http_client: Arc::clone(&HTTP_CLIENT),
1890            pre_cache: Mutex::new(HashMap::new()),
1891            session_store: None,
1892            state_store: None,
1893            wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1894            assets_tempdir: None,
1895            provider_registry: RwLock::new(None),
1896            secrets: crate::secrets::default_manager()?,
1897            oauth_config: None,
1898            cache,
1899        })
1900    }
1901}
1902
1903fn is_missing_node_export(err: &wasmtime::Error, version: &str) -> bool {
1904    let message = err.to_string();
1905    message.contains("no exported instance named")
1906        && message.contains(&format!("greentic:component/node@{version}"))
1907}
1908
1909struct PackFlows {
1910    descriptors: Vec<FlowDescriptor>,
1911    flows: HashMap<String, Flow>,
1912    metadata: PackMetadata,
1913}
1914
1915const RUNTIME_FLOW_EXTENSION_IDS: [&str; 3] = [
1916    "greentic.pack.runtime_flow",
1917    "greentic.pack.flow_runtime",
1918    "greentic.pack.runtime_flows",
1919];
1920
1921#[derive(Debug, Deserialize)]
1922struct RuntimeFlowBundle {
1923    flows: Vec<RuntimeFlow>,
1924}
1925
1926#[derive(Debug, Deserialize)]
1927struct RuntimeFlow {
1928    id: String,
1929    #[serde(alias = "flow_type")]
1930    kind: FlowKind,
1931    #[serde(default)]
1932    schema_version: Option<String>,
1933    #[serde(default)]
1934    start: Option<String>,
1935    #[serde(default)]
1936    entrypoints: BTreeMap<String, Value>,
1937    nodes: BTreeMap<String, RuntimeNode>,
1938    #[serde(default)]
1939    metadata: Option<FlowMetadata>,
1940}
1941
1942#[derive(Debug, Deserialize)]
1943struct RuntimeNode {
1944    #[serde(alias = "component")]
1945    component_id: String,
1946    #[serde(default, alias = "operation")]
1947    operation_name: Option<String>,
1948    #[serde(default, alias = "payload", alias = "input")]
1949    operation_payload: Value,
1950    #[serde(default)]
1951    routing: Option<Routing>,
1952    #[serde(default)]
1953    telemetry: Option<TelemetryHints>,
1954}
1955
1956fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
1957    if bytes.is_empty() {
1958        return Ok(Value::Null);
1959    }
1960    serde_json::from_slice(&bytes).or_else(|_| {
1961        String::from_utf8(bytes)
1962            .map(Value::String)
1963            .map_err(|err| anyhow!(err))
1964    })
1965}
1966
1967impl PackFlows {
1968    fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
1969        if let Some(flows) = flows_from_runtime_extension(&manifest) {
1970            return flows;
1971        }
1972        let descriptors = manifest
1973            .flows
1974            .iter()
1975            .map(|entry| FlowDescriptor {
1976                id: entry.id.as_str().to_string(),
1977                flow_type: flow_kind_to_str(entry.kind).to_string(),
1978                pack_id: manifest.pack_id.as_str().to_string(),
1979                profile: manifest.pack_id.as_str().to_string(),
1980                version: manifest.version.to_string(),
1981                description: None,
1982            })
1983            .collect();
1984        let mut flows = HashMap::new();
1985        for entry in &manifest.flows {
1986            flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
1987        }
1988        Self {
1989            metadata: PackMetadata::from_manifest(&manifest),
1990            descriptors,
1991            flows,
1992        }
1993    }
1994}
1995
1996fn flows_from_runtime_extension(manifest: &greentic_types::PackManifest) -> Option<PackFlows> {
1997    let extensions = manifest.extensions.as_ref()?;
1998    let extension = extensions.iter().find_map(|(key, ext)| {
1999        if RUNTIME_FLOW_EXTENSION_IDS
2000            .iter()
2001            .any(|candidate| candidate == key)
2002        {
2003            Some(ext)
2004        } else {
2005            None
2006        }
2007    })?;
2008    let runtime_flows = match decode_runtime_flow_extension(extension) {
2009        Some(flows) if !flows.is_empty() => flows,
2010        _ => return None,
2011    };
2012
2013    let descriptors = runtime_flows
2014        .iter()
2015        .map(|flow| FlowDescriptor {
2016            id: flow.id.as_str().to_string(),
2017            flow_type: flow_kind_to_str(flow.kind).to_string(),
2018            pack_id: manifest.pack_id.as_str().to_string(),
2019            profile: manifest.pack_id.as_str().to_string(),
2020            version: manifest.version.to_string(),
2021            description: None,
2022        })
2023        .collect::<Vec<_>>();
2024    let flows = runtime_flows
2025        .into_iter()
2026        .map(|flow| (flow.id.as_str().to_string(), flow))
2027        .collect();
2028
2029    Some(PackFlows {
2030        metadata: PackMetadata::from_manifest(manifest),
2031        descriptors,
2032        flows,
2033    })
2034}
2035
2036fn decode_runtime_flow_extension(extension: &ExtensionRef) -> Option<Vec<Flow>> {
2037    let value = match extension.inline.as_ref()? {
2038        ExtensionInline::Other(value) => value.clone(),
2039        _ => return None,
2040    };
2041
2042    if let Ok(bundle) = serde_json::from_value::<RuntimeFlowBundle>(value.clone()) {
2043        return Some(collect_runtime_flows(bundle.flows));
2044    }
2045
2046    if let Ok(flows) = serde_json::from_value::<Vec<RuntimeFlow>>(value.clone()) {
2047        return Some(collect_runtime_flows(flows));
2048    }
2049
2050    if let Ok(flows) = serde_json::from_value::<Vec<Flow>>(value) {
2051        return Some(flows);
2052    }
2053
2054    warn!(
2055        extension = %extension.kind,
2056        version = %extension.version,
2057        "runtime flow extension present but could not be decoded"
2058    );
2059    None
2060}
2061
2062fn collect_runtime_flows(flows: Vec<RuntimeFlow>) -> Vec<Flow> {
2063    flows
2064        .into_iter()
2065        .filter_map(|flow| match runtime_flow_to_flow(flow) {
2066            Ok(flow) => Some(flow),
2067            Err(err) => {
2068                warn!(error = %err, "failed to decode runtime flow");
2069                None
2070            }
2071        })
2072        .collect()
2073}
2074
2075fn runtime_flow_to_flow(runtime: RuntimeFlow) -> Result<Flow> {
2076    let flow_id = FlowId::from_str(&runtime.id)
2077        .with_context(|| format!("invalid flow id `{}`", runtime.id))?;
2078    let mut entrypoints = runtime.entrypoints;
2079    if entrypoints.is_empty()
2080        && let Some(start) = &runtime.start
2081    {
2082        entrypoints.insert("default".into(), Value::String(start.clone()));
2083    }
2084
2085    let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
2086    for (id, node) in runtime.nodes {
2087        let node_id = NodeId::from_str(&id).with_context(|| format!("invalid node id `{id}`"))?;
2088        let component_id = ComponentId::from_str(&node.component_id)
2089            .with_context(|| format!("invalid component id `{}`", node.component_id))?;
2090        let component = FlowComponentRef {
2091            id: component_id,
2092            pack_alias: None,
2093            operation: node.operation_name,
2094        };
2095        let routing = node.routing.unwrap_or(Routing::End);
2096        let telemetry = node.telemetry.unwrap_or_default();
2097        nodes.insert(
2098            node_id.clone(),
2099            Node {
2100                id: node_id,
2101                component,
2102                input: InputMapping {
2103                    mapping: node.operation_payload,
2104                },
2105                output: OutputMapping {
2106                    mapping: Value::Null,
2107                },
2108                routing,
2109                telemetry,
2110            },
2111        );
2112    }
2113
2114    Ok(Flow {
2115        schema_version: runtime.schema_version.unwrap_or_else(|| "1.0".to_string()),
2116        id: flow_id,
2117        kind: runtime.kind,
2118        entrypoints,
2119        nodes,
2120        metadata: runtime.metadata.unwrap_or_default(),
2121    })
2122}
2123
2124fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
2125    match kind {
2126        greentic_types::FlowKind::Messaging => "messaging",
2127        greentic_types::FlowKind::Event => "event",
2128        greentic_types::FlowKind::ComponentConfig => "component-config",
2129        greentic_types::FlowKind::Job => "job",
2130        greentic_types::FlowKind::Http => "http",
2131    }
2132}
2133
2134fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
2135    let mut file = archive
2136        .by_name(name)
2137        .with_context(|| format!("entry {name} missing from archive"))?;
2138    let mut buf = Vec::new();
2139    file.read_to_end(&mut buf)?;
2140    Ok(buf)
2141}
2142
2143fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
2144    for node in doc.nodes.values_mut() {
2145        let Some((component_ref, payload)) = node
2146            .raw
2147            .iter()
2148            .next()
2149            .map(|(key, value)| (key.clone(), value.clone()))
2150        else {
2151            continue;
2152        };
2153        if component_ref.starts_with("emit.") {
2154            node.operation = Some(component_ref);
2155            node.payload = payload;
2156            node.raw.clear();
2157            continue;
2158        }
2159        let (target_component, operation, input, config) =
2160            infer_component_exec(&payload, &component_ref);
2161        let mut payload_obj = serde_json::Map::new();
2162        // component.exec is meta; ensure the payload carries the actual target component.
2163        payload_obj.insert("component".into(), Value::String(target_component));
2164        payload_obj.insert("operation".into(), Value::String(operation));
2165        payload_obj.insert("input".into(), input);
2166        if let Some(cfg) = config {
2167            payload_obj.insert("config".into(), cfg);
2168        }
2169        node.operation = Some("component.exec".to_string());
2170        node.payload = Value::Object(payload_obj);
2171        node.raw.clear();
2172    }
2173    doc
2174}
2175
2176fn infer_component_exec(
2177    payload: &Value,
2178    component_ref: &str,
2179) -> (String, String, Value, Option<Value>) {
2180    let default_op = if component_ref.starts_with("templating.") {
2181        "render"
2182    } else {
2183        "invoke"
2184    }
2185    .to_string();
2186
2187    if let Value::Object(map) = payload {
2188        let op = map
2189            .get("op")
2190            .or_else(|| map.get("operation"))
2191            .and_then(Value::as_str)
2192            .map(|s| s.to_string())
2193            .unwrap_or_else(|| default_op.clone());
2194
2195        let mut input = map.clone();
2196        let config = input.remove("config");
2197        let component = input
2198            .get("component")
2199            .or_else(|| input.get("component_ref"))
2200            .and_then(Value::as_str)
2201            .map(|s| s.to_string())
2202            .unwrap_or_else(|| component_ref.to_string());
2203        input.remove("component");
2204        input.remove("component_ref");
2205        input.remove("op");
2206        input.remove("operation");
2207        return (component, op, Value::Object(input), config);
2208    }
2209
2210    (component_ref.to_string(), default_op, payload.clone(), None)
2211}
2212
2213#[derive(Clone, Debug)]
2214struct ComponentSpec {
2215    id: String,
2216    version: String,
2217    legacy_path: Option<String>,
2218}
2219
2220#[derive(Clone, Debug)]
2221struct ComponentSourceInfo {
2222    digest: Option<String>,
2223    source: ComponentSourceRef,
2224    artifact: ComponentArtifactLocation,
2225    expected_wasm_sha256: Option<String>,
2226    skip_digest_verification: bool,
2227}
2228
2229#[derive(Clone, Debug)]
2230enum ComponentArtifactLocation {
2231    Inline { wasm_path: String },
2232    Remote,
2233}
2234
2235#[derive(Clone, Debug, Deserialize)]
2236struct PackLockV1 {
2237    schema_version: u32,
2238    components: Vec<PackLockComponent>,
2239}
2240
2241#[derive(Clone, Debug, Deserialize)]
2242struct PackLockComponent {
2243    name: String,
2244    #[serde(default, rename = "source_ref")]
2245    source_ref: Option<String>,
2246    #[serde(default, rename = "ref")]
2247    legacy_ref: Option<String>,
2248    #[serde(default)]
2249    component_id: Option<ComponentId>,
2250    #[serde(default)]
2251    bundled: Option<bool>,
2252    #[serde(default, rename = "bundled_path")]
2253    bundled_path: Option<String>,
2254    #[serde(default, rename = "path")]
2255    legacy_path: Option<String>,
2256    #[serde(default)]
2257    wasm_sha256: Option<String>,
2258    #[serde(default, rename = "sha256")]
2259    legacy_sha256: Option<String>,
2260    #[serde(default)]
2261    resolved_digest: Option<String>,
2262    #[serde(default)]
2263    digest: Option<String>,
2264}
2265
2266fn component_specs(
2267    manifest: Option<&greentic_types::PackManifest>,
2268    legacy_manifest: Option<&legacy_pack::PackManifest>,
2269    component_sources: Option<&ComponentSourcesV1>,
2270    pack_lock: Option<&PackLockV1>,
2271) -> Vec<ComponentSpec> {
2272    if let Some(manifest) = manifest {
2273        if !manifest.components.is_empty() {
2274            return manifest
2275                .components
2276                .iter()
2277                .map(|entry| ComponentSpec {
2278                    id: entry.id.as_str().to_string(),
2279                    version: entry.version.to_string(),
2280                    legacy_path: None,
2281                })
2282                .collect();
2283        }
2284        if let Some(lock) = pack_lock {
2285            let mut seen = HashSet::new();
2286            let mut specs = Vec::new();
2287            for entry in &lock.components {
2288                let id = entry
2289                    .component_id
2290                    .as_ref()
2291                    .map(|id| id.as_str())
2292                    .unwrap_or(entry.name.as_str());
2293                if seen.insert(id.to_string()) {
2294                    specs.push(ComponentSpec {
2295                        id: id.to_string(),
2296                        version: "0.0.0".to_string(),
2297                        legacy_path: None,
2298                    });
2299                }
2300            }
2301            return specs;
2302        }
2303        if let Some(sources) = component_sources {
2304            let mut seen = HashSet::new();
2305            let mut specs = Vec::new();
2306            for entry in &sources.components {
2307                let id = entry
2308                    .component_id
2309                    .as_ref()
2310                    .map(|id| id.as_str())
2311                    .unwrap_or(entry.name.as_str());
2312                if seen.insert(id.to_string()) {
2313                    specs.push(ComponentSpec {
2314                        id: id.to_string(),
2315                        version: "0.0.0".to_string(),
2316                        legacy_path: None,
2317                    });
2318                }
2319            }
2320            return specs;
2321        }
2322    }
2323    if let Some(legacy_manifest) = legacy_manifest {
2324        return legacy_manifest
2325            .components
2326            .iter()
2327            .map(|entry| ComponentSpec {
2328                id: entry.name.clone(),
2329                version: entry.version.to_string(),
2330                legacy_path: Some(entry.file_wasm.clone()),
2331            })
2332            .collect();
2333    }
2334    Vec::new()
2335}
2336
2337fn component_sources_table(
2338    sources: Option<&ComponentSourcesV1>,
2339) -> Result<Option<HashMap<String, ComponentSourceInfo>>> {
2340    let Some(sources) = sources else {
2341        return Ok(None);
2342    };
2343    let mut table = HashMap::new();
2344    for entry in &sources.components {
2345        let artifact = match &entry.artifact {
2346            ArtifactLocationV1::Inline { wasm_path, .. } => ComponentArtifactLocation::Inline {
2347                wasm_path: wasm_path.clone(),
2348            },
2349            ArtifactLocationV1::Remote => ComponentArtifactLocation::Remote,
2350        };
2351        let info = ComponentSourceInfo {
2352            digest: Some(entry.resolved.digest.clone()),
2353            source: entry.source.clone(),
2354            artifact,
2355            expected_wasm_sha256: None,
2356            skip_digest_verification: false,
2357        };
2358        if let Some(component_id) = entry.component_id.as_ref() {
2359            table.insert(component_id.as_str().to_string(), info.clone());
2360        }
2361        table.insert(entry.name.clone(), info);
2362    }
2363    Ok(Some(table))
2364}
2365
2366fn load_pack_lock(path: &Path) -> Result<Option<PackLockV1>> {
2367    let lock_path = if path.is_dir() {
2368        let candidate = path.join("pack.lock");
2369        if candidate.exists() {
2370            Some(candidate)
2371        } else {
2372            let candidate = path.join("pack.lock.json");
2373            candidate.exists().then_some(candidate)
2374        }
2375    } else {
2376        None
2377    };
2378    let Some(lock_path) = lock_path else {
2379        return Ok(None);
2380    };
2381    let raw = std::fs::read_to_string(&lock_path)
2382        .with_context(|| format!("failed to read {}", lock_path.display()))?;
2383    let lock: PackLockV1 = serde_json::from_str(&raw).context("failed to parse pack.lock")?;
2384    if lock.schema_version != 1 {
2385        bail!("pack.lock schema_version must be 1");
2386    }
2387    Ok(Some(lock))
2388}
2389
2390fn find_pack_lock_roots(
2391    pack_path: &Path,
2392    is_dir: bool,
2393    archive_hint: Option<&Path>,
2394) -> Vec<PathBuf> {
2395    if is_dir {
2396        return vec![pack_path.to_path_buf()];
2397    }
2398    let mut roots = Vec::new();
2399    if let Some(archive_path) = archive_hint {
2400        if let Some(parent) = archive_path.parent() {
2401            roots.push(parent.to_path_buf());
2402            if let Some(grandparent) = parent.parent() {
2403                roots.push(grandparent.to_path_buf());
2404            }
2405        }
2406    } else if let Some(parent) = pack_path.parent() {
2407        roots.push(parent.to_path_buf());
2408        if let Some(grandparent) = parent.parent() {
2409            roots.push(grandparent.to_path_buf());
2410        }
2411    }
2412    roots
2413}
2414
2415fn normalize_sha256(digest: &str) -> Result<String> {
2416    let trimmed = digest.trim();
2417    if trimmed.is_empty() {
2418        bail!("sha256 digest cannot be empty");
2419    }
2420    if let Some(stripped) = trimmed.strip_prefix("sha256:") {
2421        if stripped.is_empty() {
2422            bail!("sha256 digest must include hex bytes after sha256:");
2423        }
2424        return Ok(trimmed.to_string());
2425    }
2426    if trimmed.chars().all(|c| c.is_ascii_hexdigit()) {
2427        return Ok(format!("sha256:{trimmed}"));
2428    }
2429    bail!("sha256 digest must be hex or sha256:<hex>");
2430}
2431
2432fn component_sources_table_from_pack_lock(
2433    lock: &PackLockV1,
2434    allow_missing_hash: bool,
2435) -> Result<HashMap<String, ComponentSourceInfo>> {
2436    let mut table = HashMap::new();
2437    let mut names = HashSet::new();
2438    for entry in &lock.components {
2439        if !names.insert(entry.name.clone()) {
2440            bail!(
2441                "pack.lock contains duplicate component name `{}`",
2442                entry.name
2443            );
2444        }
2445        let source_ref = match (&entry.source_ref, &entry.legacy_ref) {
2446            (Some(primary), Some(legacy)) => {
2447                if primary != legacy {
2448                    bail!(
2449                        "pack.lock component {} has conflicting refs: {} vs {}",
2450                        entry.name,
2451                        primary,
2452                        legacy
2453                    );
2454                }
2455                primary.as_str()
2456            }
2457            (Some(primary), None) => primary.as_str(),
2458            (None, Some(legacy)) => legacy.as_str(),
2459            (None, None) => {
2460                bail!("pack.lock component {} missing source_ref", entry.name);
2461            }
2462        };
2463        let source: ComponentSourceRef = source_ref
2464            .parse()
2465            .with_context(|| format!("invalid component ref `{}`", source_ref))?;
2466        let bundled_path = match (&entry.bundled_path, &entry.legacy_path) {
2467            (Some(primary), Some(legacy)) => {
2468                if primary != legacy {
2469                    bail!(
2470                        "pack.lock component {} has conflicting bundled paths: {} vs {}",
2471                        entry.name,
2472                        primary,
2473                        legacy
2474                    );
2475                }
2476                Some(primary.clone())
2477            }
2478            (Some(primary), None) => Some(primary.clone()),
2479            (None, Some(legacy)) => Some(legacy.clone()),
2480            (None, None) => None,
2481        };
2482        let bundled = entry.bundled.unwrap_or(false) || bundled_path.is_some();
2483        let (artifact, digest, expected_wasm_sha256, skip_digest_verification) = if bundled {
2484            let wasm_path = bundled_path.ok_or_else(|| {
2485                anyhow!(
2486                    "pack.lock component {} marked bundled but bundled_path is missing",
2487                    entry.name
2488                )
2489            })?;
2490            let expected_raw = match (&entry.wasm_sha256, &entry.legacy_sha256) {
2491                (Some(primary), Some(legacy)) => {
2492                    if primary != legacy {
2493                        bail!(
2494                            "pack.lock component {} has conflicting wasm_sha256 values: {} vs {}",
2495                            entry.name,
2496                            primary,
2497                            legacy
2498                        );
2499                    }
2500                    Some(primary.as_str())
2501                }
2502                (Some(primary), None) => Some(primary.as_str()),
2503                (None, Some(legacy)) => Some(legacy.as_str()),
2504                (None, None) => None,
2505            };
2506            let expected = match expected_raw {
2507                Some(value) => Some(normalize_sha256(value)?),
2508                None => None,
2509            };
2510            if expected.is_none() && !allow_missing_hash {
2511                bail!(
2512                    "pack.lock component {} missing wasm_sha256 for bundled component",
2513                    entry.name
2514                );
2515            }
2516            (
2517                ComponentArtifactLocation::Inline { wasm_path },
2518                expected.clone(),
2519                expected,
2520                allow_missing_hash && expected_raw.is_none(),
2521            )
2522        } else {
2523            if source.is_tag() {
2524                bail!(
2525                    "component {} uses tag ref {} but is not bundled; rebuild the pack",
2526                    entry.name,
2527                    source
2528                );
2529            }
2530            let expected = entry
2531                .resolved_digest
2532                .as_deref()
2533                .or(entry.digest.as_deref())
2534                .ok_or_else(|| {
2535                    anyhow!(
2536                        "pack.lock component {} missing resolved_digest for remote component",
2537                        entry.name
2538                    )
2539                })?;
2540            (
2541                ComponentArtifactLocation::Remote,
2542                Some(normalize_digest(expected)),
2543                None,
2544                false,
2545            )
2546        };
2547        let info = ComponentSourceInfo {
2548            digest,
2549            source,
2550            artifact,
2551            expected_wasm_sha256,
2552            skip_digest_verification,
2553        };
2554        if let Some(component_id) = entry.component_id.as_ref() {
2555            let key = component_id.as_str().to_string();
2556            if table.contains_key(&key) {
2557                bail!(
2558                    "pack.lock contains duplicate component id `{}`",
2559                    component_id.as_str()
2560                );
2561            }
2562            table.insert(key, info.clone());
2563        }
2564        if entry.name
2565            != entry
2566                .component_id
2567                .as_ref()
2568                .map(|id| id.as_str())
2569                .unwrap_or("")
2570        {
2571            table.insert(entry.name.clone(), info);
2572        }
2573    }
2574    Ok(table)
2575}
2576
2577fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
2578    if let Some(path) = &spec.legacy_path {
2579        return root.join(path);
2580    }
2581    root.join("components").join(format!("{}.wasm", spec.id))
2582}
2583
2584fn normalize_digest(digest: &str) -> String {
2585    if digest.starts_with("sha256:") || digest.starts_with("blake3:") {
2586        digest.to_string()
2587    } else {
2588        format!("sha256:{digest}")
2589    }
2590}
2591
2592fn compute_digest_for(bytes: &[u8], digest: &str) -> Result<String> {
2593    if digest.starts_with("blake3:") {
2594        let hash = blake3::hash(bytes);
2595        return Ok(format!("blake3:{}", hash.to_hex()));
2596    }
2597    let mut hasher = sha2::Sha256::new();
2598    hasher.update(bytes);
2599    Ok(format!("sha256:{:x}", hasher.finalize()))
2600}
2601
2602fn compute_sha256_digest_for(bytes: &[u8]) -> String {
2603    let mut hasher = sha2::Sha256::new();
2604    hasher.update(bytes);
2605    format!("sha256:{:x}", hasher.finalize())
2606}
2607
2608fn build_artifact_key(cache: &CacheManager, digest: Option<&str>, bytes: &[u8]) -> ArtifactKey {
2609    let wasm_digest = digest
2610        .map(normalize_digest)
2611        .unwrap_or_else(|| compute_sha256_digest_for(bytes));
2612    ArtifactKey::new(cache.engine_profile_id().to_string(), wasm_digest)
2613}
2614
2615async fn compile_component_with_cache(
2616    cache: &CacheManager,
2617    engine: &Engine,
2618    digest: Option<&str>,
2619    bytes: Vec<u8>,
2620) -> Result<Arc<Component>> {
2621    let key = build_artifact_key(cache, digest, &bytes);
2622    cache.get_component(engine, &key, || Ok(bytes)).await
2623}
2624
2625fn verify_component_digest(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2626    let normalized_expected = normalize_digest(expected);
2627    let actual = compute_digest_for(bytes, &normalized_expected)?;
2628    if normalize_digest(&actual) != normalized_expected {
2629        bail!(
2630            "component {component_id} digest mismatch: expected {normalized_expected}, got {actual}"
2631        );
2632    }
2633    Ok(())
2634}
2635
2636fn verify_wasm_sha256(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2637    let normalized_expected = normalize_sha256(expected)?;
2638    let actual = compute_sha256_digest_for(bytes);
2639    if actual != normalized_expected {
2640        bail!(
2641            "component {component_id} bundled digest mismatch: expected {normalized_expected}, got {actual}"
2642        );
2643    }
2644    Ok(())
2645}
2646
2647#[cfg(test)]
2648mod pack_lock_tests {
2649    use super::*;
2650    use tempfile::TempDir;
2651
2652    #[test]
2653    fn pack_lock_tag_ref_requires_bundle() {
2654        let lock = PackLockV1 {
2655            schema_version: 1,
2656            components: vec![PackLockComponent {
2657                name: "templates".to_string(),
2658                source_ref: Some("oci://registry.test/templates:latest".to_string()),
2659                legacy_ref: None,
2660                component_id: None,
2661                bundled: Some(false),
2662                bundled_path: None,
2663                legacy_path: None,
2664                wasm_sha256: None,
2665                legacy_sha256: None,
2666                resolved_digest: None,
2667                digest: None,
2668            }],
2669        };
2670        let err = component_sources_table_from_pack_lock(&lock, false).unwrap_err();
2671        assert!(
2672            err.to_string().contains("tag ref") && err.to_string().contains("rebuild the pack"),
2673            "unexpected error: {err}"
2674        );
2675    }
2676
2677    #[test]
2678    fn bundled_hash_mismatch_errors() {
2679        let rt = tokio::runtime::Runtime::new().expect("runtime");
2680        let temp = TempDir::new().expect("temp dir");
2681        let engine = Engine::default();
2682        let engine_profile =
2683            EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
2684        let cache_config = CacheConfig {
2685            root: temp.path().join("cache"),
2686            ..CacheConfig::default()
2687        };
2688        let cache = CacheManager::new(cache_config, engine_profile);
2689        let wasm_path = temp.path().join("component.wasm");
2690        let fixture_wasm = Path::new(env!("CARGO_MANIFEST_DIR"))
2691            .join("../../tests/fixtures/packs/secrets_store_smoke/components/echo_secret.wasm");
2692        let bytes = std::fs::read(&fixture_wasm).expect("read fixture wasm");
2693        std::fs::write(&wasm_path, &bytes).expect("write temp wasm");
2694
2695        let spec = ComponentSpec {
2696            id: "qa.process".to_string(),
2697            version: "0.0.0".to_string(),
2698            legacy_path: None,
2699        };
2700        let mut missing = HashSet::new();
2701        missing.insert(spec.id.clone());
2702
2703        let mut sources = HashMap::new();
2704        sources.insert(
2705            spec.id.clone(),
2706            ComponentSourceInfo {
2707                digest: Some("sha256:deadbeef".to_string()),
2708                source: ComponentSourceRef::Oci("registry.test/qa.process@sha256:deadbeef".into()),
2709                artifact: ComponentArtifactLocation::Inline {
2710                    wasm_path: "component.wasm".to_string(),
2711                },
2712                expected_wasm_sha256: Some("sha256:deadbeef".to_string()),
2713                skip_digest_verification: false,
2714            },
2715        );
2716
2717        let mut loaded = HashMap::new();
2718        let result = rt.block_on(load_components_from_sources(
2719            &cache,
2720            &engine,
2721            &sources,
2722            &ComponentResolution::default(),
2723            &[spec],
2724            &mut missing,
2725            &mut loaded,
2726            Some(temp.path()),
2727            None,
2728        ));
2729        let err = result.unwrap_err();
2730        assert!(
2731            err.to_string().contains("bundled digest mismatch"),
2732            "unexpected error: {err}"
2733        );
2734    }
2735}
2736
2737#[cfg(test)]
2738mod pack_resolution_prop_tests {
2739    use super::*;
2740    use greentic_types::{ArtifactLocationV1, ComponentSourceEntryV1, ResolvedComponentV1};
2741    use proptest::prelude::*;
2742    use proptest::test_runner::{Config as ProptestConfig, RngAlgorithm, TestRng, TestRunner};
2743    use std::collections::BTreeSet;
2744    use std::path::Path;
2745    use std::str::FromStr;
2746
2747    #[derive(Clone, Debug)]
2748    enum ResolveRequest {
2749        ById(String),
2750        ByName(String),
2751    }
2752
2753    #[derive(Clone, Debug, PartialEq, Eq)]
2754    struct ResolvedComponent {
2755        key: String,
2756        source: String,
2757        artifact: String,
2758        digest: Option<String>,
2759        expected_wasm_sha256: Option<String>,
2760        skip_digest_verification: bool,
2761    }
2762
2763    #[derive(Clone, Debug, PartialEq, Eq)]
2764    struct ResolveError {
2765        code: String,
2766        message: String,
2767        context_key: String,
2768    }
2769
2770    #[derive(Clone, Debug)]
2771    struct Scenario {
2772        pack_lock: Option<PackLockV1>,
2773        component_sources: Option<ComponentSourcesV1>,
2774        request: ResolveRequest,
2775        expected_sha256: Option<String>,
2776        bytes: Vec<u8>,
2777    }
2778
2779    fn resolve_component_test(
2780        sources: Option<&ComponentSourcesV1>,
2781        lock: Option<&PackLockV1>,
2782        request: &ResolveRequest,
2783    ) -> Result<ResolvedComponent, ResolveError> {
2784        let table = if let Some(lock) = lock {
2785            component_sources_table_from_pack_lock(lock, false).map_err(|err| ResolveError {
2786                code: classify_pack_lock_error(err.to_string().as_str()).to_string(),
2787                message: err.to_string(),
2788                context_key: request_key(request).to_string(),
2789            })?
2790        } else {
2791            let sources = component_sources_table(sources).map_err(|err| ResolveError {
2792                code: "component_sources_error".to_string(),
2793                message: err.to_string(),
2794                context_key: request_key(request).to_string(),
2795            })?;
2796            sources.ok_or_else(|| ResolveError {
2797                code: "missing_component_sources".to_string(),
2798                message: "component sources not provided".to_string(),
2799                context_key: request_key(request).to_string(),
2800            })?
2801        };
2802
2803        let key = request_key(request);
2804        let source = table.get(key).ok_or_else(|| ResolveError {
2805            code: "component_not_found".to_string(),
2806            message: format!("component {key} not found"),
2807            context_key: key.to_string(),
2808        })?;
2809
2810        Ok(ResolvedComponent {
2811            key: key.to_string(),
2812            source: source.source.to_string(),
2813            artifact: match source.artifact {
2814                ComponentArtifactLocation::Inline { .. } => "inline".to_string(),
2815                ComponentArtifactLocation::Remote => "remote".to_string(),
2816            },
2817            digest: source.digest.clone(),
2818            expected_wasm_sha256: source.expected_wasm_sha256.clone(),
2819            skip_digest_verification: source.skip_digest_verification,
2820        })
2821    }
2822
2823    fn request_key(request: &ResolveRequest) -> &str {
2824        match request {
2825            ResolveRequest::ById(value) => value.as_str(),
2826            ResolveRequest::ByName(value) => value.as_str(),
2827        }
2828    }
2829
2830    fn classify_pack_lock_error(message: &str) -> &'static str {
2831        if message.contains("duplicate component name") {
2832            "duplicate_name"
2833        } else if message.contains("duplicate component id") {
2834            "duplicate_id"
2835        } else if message.contains("conflicting refs") {
2836            "conflicting_ref"
2837        } else if message.contains("conflicting bundled paths") {
2838            "conflicting_bundled_path"
2839        } else if message.contains("conflicting wasm_sha256") {
2840            "conflicting_wasm_sha256"
2841        } else if message.contains("missing source_ref") {
2842            "missing_source_ref"
2843        } else if message.contains("marked bundled but bundled_path is missing") {
2844            "missing_bundled_path"
2845        } else if message.contains("missing wasm_sha256") {
2846            "missing_wasm_sha256"
2847        } else if message.contains("tag ref") && message.contains("not bundled") {
2848            "tag_ref_requires_bundle"
2849        } else if message.contains("missing resolved_digest") {
2850            "missing_resolved_digest"
2851        } else if message.contains("invalid component ref") {
2852            "invalid_component_ref"
2853        } else if message.contains("sha256 digest") {
2854            "invalid_sha256"
2855        } else {
2856            "unknown_error"
2857        }
2858    }
2859
2860    fn known_error_codes() -> BTreeSet<&'static str> {
2861        [
2862            "component_sources_error",
2863            "missing_component_sources",
2864            "component_not_found",
2865            "duplicate_name",
2866            "duplicate_id",
2867            "conflicting_ref",
2868            "conflicting_bundled_path",
2869            "conflicting_wasm_sha256",
2870            "missing_source_ref",
2871            "missing_bundled_path",
2872            "missing_wasm_sha256",
2873            "tag_ref_requires_bundle",
2874            "missing_resolved_digest",
2875            "invalid_component_ref",
2876            "invalid_sha256",
2877            "unknown_error",
2878        ]
2879        .into_iter()
2880        .collect()
2881    }
2882
2883    fn proptest_config() -> ProptestConfig {
2884        let cases = std::env::var("PROPTEST_CASES")
2885            .ok()
2886            .and_then(|value| value.parse::<u32>().ok())
2887            .unwrap_or(128);
2888        ProptestConfig {
2889            cases,
2890            failure_persistence: None,
2891            ..ProptestConfig::default()
2892        }
2893    }
2894
2895    fn proptest_seed() -> Option<[u8; 32]> {
2896        let seed = std::env::var("PROPTEST_SEED")
2897            .ok()
2898            .and_then(|value| value.parse::<u64>().ok())?;
2899        let mut bytes = [0u8; 32];
2900        bytes[..8].copy_from_slice(&seed.to_le_bytes());
2901        Some(bytes)
2902    }
2903
2904    fn run_cases(strategy: impl Strategy<Value = Scenario>, cases: u32, seed: Option<[u8; 32]>) {
2905        let config = ProptestConfig {
2906            cases,
2907            failure_persistence: None,
2908            ..ProptestConfig::default()
2909        };
2910        let mut runner = match seed {
2911            Some(bytes) => {
2912                TestRunner::new_with_rng(config, TestRng::from_seed(RngAlgorithm::ChaCha, &bytes))
2913            }
2914            None => TestRunner::new(config),
2915        };
2916        runner
2917            .run(&strategy, |scenario| {
2918                run_scenario(&scenario);
2919                Ok(())
2920            })
2921            .unwrap();
2922    }
2923
2924    fn run_scenario(scenario: &Scenario) {
2925        let known_codes = known_error_codes();
2926        let first = resolve_component_test(
2927            scenario.component_sources.as_ref(),
2928            scenario.pack_lock.as_ref(),
2929            &scenario.request,
2930        );
2931        let second = resolve_component_test(
2932            scenario.component_sources.as_ref(),
2933            scenario.pack_lock.as_ref(),
2934            &scenario.request,
2935        );
2936        assert_eq!(normalize_result(&first), normalize_result(&second));
2937
2938        if let Some(lock) = scenario.pack_lock.as_ref() {
2939            let lock_only = resolve_component_test(None, Some(lock), &scenario.request);
2940            assert_eq!(normalize_result(&first), normalize_result(&lock_only));
2941        }
2942
2943        if let Err(err) = first.as_ref() {
2944            assert!(
2945                known_codes.contains(err.code.as_str()),
2946                "unexpected error code {}: {}",
2947                err.code,
2948                err.message
2949            );
2950        }
2951
2952        if let Some(expected) = scenario.expected_sha256.as_deref() {
2953            let expected_ok =
2954                verify_wasm_sha256("test.component", expected, &scenario.bytes).is_ok();
2955            let actual = compute_sha256_digest_for(&scenario.bytes);
2956            if actual == normalize_sha256(expected).unwrap_or_default() {
2957                assert!(expected_ok, "expected sha256 match to succeed");
2958            } else {
2959                assert!(!expected_ok, "expected sha256 mismatch to fail");
2960            }
2961        }
2962    }
2963
2964    fn normalize_result(
2965        result: &Result<ResolvedComponent, ResolveError>,
2966    ) -> Result<ResolvedComponent, ResolveError> {
2967        match result {
2968            Ok(value) => Ok(value.clone()),
2969            Err(err) => Err(err.clone()),
2970        }
2971    }
2972
2973    fn scenario_strategy() -> impl Strategy<Value = Scenario> {
2974        let name = any::<u8>().prop_map(|n| format!("component{n}.core"));
2975        let alt_name = any::<u8>().prop_map(|n| format!("component_alt{n}.core"));
2976        let tag_ref = any::<bool>();
2977        let bundled = any::<bool>();
2978        let include_sha = any::<bool>();
2979        let include_component_id = any::<bool>();
2980        let request_by_id = any::<bool>();
2981        let use_lock = any::<bool>();
2982        let use_sources = any::<bool>();
2983        let bytes = prop::collection::vec(any::<u8>(), 1..64);
2984
2985        (
2986            name,
2987            alt_name,
2988            tag_ref,
2989            bundled,
2990            include_sha,
2991            include_component_id,
2992            request_by_id,
2993            use_lock,
2994            use_sources,
2995            bytes,
2996        )
2997            .prop_map(
2998                |(
2999                    name,
3000                    alt_name,
3001                    tag_ref,
3002                    bundled,
3003                    include_sha,
3004                    include_component_id,
3005                    request_by_id,
3006                    use_lock,
3007                    use_sources,
3008                    bytes,
3009                )| {
3010                    let component_id_str = if include_component_id {
3011                        alt_name.clone()
3012                    } else {
3013                        name.clone()
3014                    };
3015                    let component_id = ComponentId::from_str(&component_id_str).ok();
3016                    let source_ref = if tag_ref {
3017                        format!("oci://registry.test/{name}:v1")
3018                    } else {
3019                        format!(
3020                            "oci://registry.test/{name}@sha256:{}",
3021                            hex::encode([0x11u8; 32])
3022                        )
3023                    };
3024                    let expected_sha256 = if bundled && include_sha {
3025                        Some(compute_sha256_digest_for(&bytes))
3026                    } else {
3027                        None
3028                    };
3029
3030                    let lock_component = PackLockComponent {
3031                        name: name.clone(),
3032                        source_ref: Some(source_ref),
3033                        legacy_ref: None,
3034                        component_id,
3035                        bundled: Some(bundled),
3036                        bundled_path: if bundled {
3037                            Some(format!("components/{name}.wasm"))
3038                        } else {
3039                            None
3040                        },
3041                        legacy_path: None,
3042                        wasm_sha256: expected_sha256.clone(),
3043                        legacy_sha256: None,
3044                        resolved_digest: if bundled {
3045                            None
3046                        } else {
3047                            Some("sha256:deadbeef".to_string())
3048                        },
3049                        digest: None,
3050                    };
3051
3052                    let pack_lock = if use_lock {
3053                        Some(PackLockV1 {
3054                            schema_version: 1,
3055                            components: vec![lock_component],
3056                        })
3057                    } else {
3058                        None
3059                    };
3060
3061                    let component_sources = if use_sources {
3062                        Some(ComponentSourcesV1::new(vec![ComponentSourceEntryV1 {
3063                            name: name.clone(),
3064                            component_id: ComponentId::from_str(&name).ok(),
3065                            source: ComponentSourceRef::from_str(
3066                                "oci://registry.test/component@sha256:deadbeef",
3067                            )
3068                            .expect("component ref"),
3069                            resolved: ResolvedComponentV1 {
3070                                digest: "sha256:deadbeef".to_string(),
3071                                signature: None,
3072                                signed_by: None,
3073                            },
3074                            artifact: if bundled {
3075                                ArtifactLocationV1::Inline {
3076                                    wasm_path: format!("components/{name}.wasm"),
3077                                    manifest_path: None,
3078                                }
3079                            } else {
3080                                ArtifactLocationV1::Remote
3081                            },
3082                            licensing_hint: None,
3083                            metering_hint: None,
3084                        }]))
3085                    } else {
3086                        None
3087                    };
3088
3089                    let request = if request_by_id {
3090                        ResolveRequest::ById(component_id_str.clone())
3091                    } else {
3092                        ResolveRequest::ByName(name.clone())
3093                    };
3094
3095                    Scenario {
3096                        pack_lock,
3097                        component_sources,
3098                        request,
3099                        expected_sha256,
3100                        bytes,
3101                    }
3102                },
3103            )
3104    }
3105
3106    #[test]
3107    fn pack_resolution_proptest() {
3108        let seed = proptest_seed();
3109        run_cases(scenario_strategy(), proptest_config().cases, seed);
3110    }
3111
3112    #[test]
3113    fn pack_resolution_regression_seeds() {
3114        let seeds_path =
3115            Path::new(env!("CARGO_MANIFEST_DIR")).join("../../tests/fixtures/proptest-seeds.txt");
3116        let raw = std::fs::read_to_string(&seeds_path).expect("read proptest seeds");
3117        for line in raw.lines() {
3118            let line = line.trim();
3119            if line.is_empty() || line.starts_with('#') {
3120                continue;
3121            }
3122            let seed = line.parse::<u64>().expect("seed must be an integer");
3123            let mut bytes = [0u8; 32];
3124            bytes[..8].copy_from_slice(&seed.to_le_bytes());
3125            run_cases(scenario_strategy(), 1, Some(bytes));
3126        }
3127    }
3128}
3129
3130fn locate_pack_assets(
3131    materialized_root: Option<&Path>,
3132    archive_hint: Option<&Path>,
3133) -> Result<(Option<PathBuf>, Option<TempDir>)> {
3134    if let Some(root) = materialized_root {
3135        let assets = root.join("assets");
3136        if assets.is_dir() {
3137            return Ok((Some(assets), None));
3138        }
3139    }
3140    if let Some(path) = archive_hint
3141        && let Some((tempdir, assets)) = extract_assets_from_archive(path)?
3142    {
3143        return Ok((Some(assets), Some(tempdir)));
3144    }
3145    Ok((None, None))
3146}
3147
3148fn extract_assets_from_archive(path: &Path) -> Result<Option<(TempDir, PathBuf)>> {
3149    let file =
3150        File::open(path).with_context(|| format!("failed to open pack {}", path.display()))?;
3151    let mut archive =
3152        ZipArchive::new(file).with_context(|| format!("failed to read pack {}", path.display()))?;
3153    let temp = TempDir::new().context("failed to create temporary assets directory")?;
3154    let mut found = false;
3155    for idx in 0..archive.len() {
3156        let mut entry = archive.by_index(idx)?;
3157        let name = entry.name();
3158        if !name.starts_with("assets/") {
3159            continue;
3160        }
3161        let dest = temp.path().join(name);
3162        if name.ends_with('/') {
3163            std::fs::create_dir_all(&dest)?;
3164            found = true;
3165            continue;
3166        }
3167        if let Some(parent) = dest.parent() {
3168            std::fs::create_dir_all(parent)?;
3169        }
3170        let mut outfile = std::fs::File::create(&dest)?;
3171        std::io::copy(&mut entry, &mut outfile)?;
3172        found = true;
3173    }
3174    if found {
3175        let assets_path = temp.path().join("assets");
3176        Ok(Some((temp, assets_path)))
3177    } else {
3178        Ok(None)
3179    }
3180}
3181
3182fn dist_options_from(component_resolution: &ComponentResolution) -> DistOptions {
3183    let mut opts = DistOptions {
3184        allow_tags: true,
3185        ..DistOptions::default()
3186    };
3187    if let Some(cache_dir) = component_resolution.dist_cache_dir.clone() {
3188        opts.cache_dir = cache_dir;
3189    }
3190    if component_resolution.dist_offline {
3191        opts.offline = true;
3192    }
3193    opts
3194}
3195
3196#[allow(clippy::too_many_arguments)]
3197async fn load_components_from_sources(
3198    cache: &CacheManager,
3199    engine: &Engine,
3200    component_sources: &HashMap<String, ComponentSourceInfo>,
3201    component_resolution: &ComponentResolution,
3202    specs: &[ComponentSpec],
3203    missing: &mut HashSet<String>,
3204    into: &mut HashMap<String, PackComponent>,
3205    materialized_root: Option<&Path>,
3206    archive_hint: Option<&Path>,
3207) -> Result<()> {
3208    let mut archive = if let Some(path) = archive_hint {
3209        Some(
3210            ZipArchive::new(File::open(path)?)
3211                .with_context(|| format!("{} is not a valid gtpack", path.display()))?,
3212        )
3213    } else {
3214        None
3215    };
3216    let mut dist_client: Option<DistClient> = None;
3217
3218    for spec in specs {
3219        if !missing.contains(&spec.id) {
3220            continue;
3221        }
3222        let Some(source) = component_sources.get(&spec.id) else {
3223            continue;
3224        };
3225
3226        let bytes = match &source.artifact {
3227            ComponentArtifactLocation::Inline { wasm_path } => {
3228                if let Some(root) = materialized_root {
3229                    let path = root.join(wasm_path);
3230                    if path.exists() {
3231                        std::fs::read(&path).with_context(|| {
3232                            format!(
3233                                "failed to read inline component {} from {}",
3234                                spec.id,
3235                                path.display()
3236                            )
3237                        })?
3238                    } else if archive.is_none() {
3239                        bail!("inline component {} missing at {}", spec.id, path.display());
3240                    } else {
3241                        read_entry(
3242                            archive.as_mut().expect("archive present when needed"),
3243                            wasm_path,
3244                        )
3245                        .with_context(|| {
3246                            format!(
3247                                "inline component {} missing at {} in pack archive",
3248                                spec.id, wasm_path
3249                            )
3250                        })?
3251                    }
3252                } else if let Some(archive) = archive.as_mut() {
3253                    read_entry(archive, wasm_path).with_context(|| {
3254                        format!(
3255                            "inline component {} missing at {} in pack archive",
3256                            spec.id, wasm_path
3257                        )
3258                    })?
3259                } else {
3260                    bail!(
3261                        "inline component {} missing and no pack source available",
3262                        spec.id
3263                    );
3264                }
3265            }
3266            ComponentArtifactLocation::Remote => {
3267                if source.source.is_tag() {
3268                    bail!(
3269                        "component {} uses tag ref {} but is not bundled; rebuild the pack",
3270                        spec.id,
3271                        source.source
3272                    );
3273                }
3274                let client = dist_client.get_or_insert_with(|| {
3275                    DistClient::new(dist_options_from(component_resolution))
3276                });
3277                let reference = source.source.to_string();
3278                fault::maybe_fail_asset(&reference)
3279                    .await
3280                    .with_context(|| format!("fault injection blocked asset {reference}"))?;
3281                let digest = source.digest.as_deref().ok_or_else(|| {
3282                    anyhow!(
3283                        "component {} missing expected digest for remote component",
3284                        spec.id
3285                    )
3286                })?;
3287                let cache_path = if component_resolution.dist_offline {
3288                    client
3289                        .fetch_digest(digest)
3290                        .await
3291                        .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?
3292                } else {
3293                    let resolved = client
3294                        .resolve_ref(&reference)
3295                        .await
3296                        .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
3297                    let expected = normalize_digest(digest);
3298                    let actual = normalize_digest(&resolved.digest);
3299                    if expected != actual {
3300                        bail!(
3301                            "component {} digest mismatch after fetch: expected {}, got {}",
3302                            spec.id,
3303                            expected,
3304                            actual
3305                        );
3306                    }
3307                    resolved.cache_path.ok_or_else(|| {
3308                        anyhow!(
3309                            "component {} resolved from {} but cache path is missing",
3310                            spec.id,
3311                            reference
3312                        )
3313                    })?
3314                };
3315                std::fs::read(&cache_path).with_context(|| {
3316                    format!(
3317                        "failed to read cached component {} from {}",
3318                        spec.id,
3319                        cache_path.display()
3320                    )
3321                })?
3322            }
3323        };
3324
3325        if let Some(expected) = source.expected_wasm_sha256.as_deref() {
3326            verify_wasm_sha256(&spec.id, expected, &bytes)?;
3327        } else if source.skip_digest_verification {
3328            let actual = compute_sha256_digest_for(&bytes);
3329            warn!(
3330                component_id = %spec.id,
3331                digest = %actual,
3332                "bundled component missing wasm_sha256; allowing due to flag"
3333            );
3334        } else {
3335            let expected = source.digest.as_deref().ok_or_else(|| {
3336                anyhow!(
3337                    "component {} missing expected digest for verification",
3338                    spec.id
3339                )
3340            })?;
3341            verify_component_digest(&spec.id, expected, &bytes)?;
3342        }
3343        let component =
3344            compile_component_with_cache(cache, engine, source.digest.as_deref(), bytes)
3345                .await
3346                .with_context(|| format!("failed to compile component {}", spec.id))?;
3347        into.insert(
3348            spec.id.clone(),
3349            PackComponent {
3350                name: spec.id.clone(),
3351                version: spec.version.clone(),
3352                component,
3353            },
3354        );
3355        missing.remove(&spec.id);
3356    }
3357
3358    Ok(())
3359}
3360
3361fn dist_error_for_component(err: DistError, component_id: &str, reference: &str) -> anyhow::Error {
3362    match err {
3363        DistError::CacheMiss { reference: missing } => anyhow!(
3364            "remote component {} is not cached for {}. Run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3365            component_id,
3366            missing,
3367            reference
3368        ),
3369        DistError::Offline { reference: blocked } => anyhow!(
3370            "offline mode blocked fetching component {} from {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3371            component_id,
3372            blocked,
3373            reference
3374        ),
3375        DistError::AuthRequired { target } => anyhow!(
3376            "component {} requires authenticated source {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3377            component_id,
3378            target,
3379            reference
3380        ),
3381        other => anyhow!(
3382            "failed to resolve component {} from {}: {}",
3383            component_id,
3384            reference,
3385            other
3386        ),
3387    }
3388}
3389
3390async fn load_components_from_overrides(
3391    cache: &CacheManager,
3392    engine: &Engine,
3393    overrides: &HashMap<String, PathBuf>,
3394    specs: &[ComponentSpec],
3395    missing: &mut HashSet<String>,
3396    into: &mut HashMap<String, PackComponent>,
3397) -> Result<()> {
3398    for spec in specs {
3399        if !missing.contains(&spec.id) {
3400            continue;
3401        }
3402        let Some(path) = overrides.get(&spec.id) else {
3403            continue;
3404        };
3405        let bytes = std::fs::read(path)
3406            .with_context(|| format!("failed to read override component {}", path.display()))?;
3407        let component = compile_component_with_cache(cache, engine, None, bytes)
3408            .await
3409            .with_context(|| {
3410                format!(
3411                    "failed to compile component {} from override {}",
3412                    spec.id,
3413                    path.display()
3414                )
3415            })?;
3416        into.insert(
3417            spec.id.clone(),
3418            PackComponent {
3419                name: spec.id.clone(),
3420                version: spec.version.clone(),
3421                component,
3422            },
3423        );
3424        missing.remove(&spec.id);
3425    }
3426    Ok(())
3427}
3428
3429async fn load_components_from_dir(
3430    cache: &CacheManager,
3431    engine: &Engine,
3432    root: &Path,
3433    specs: &[ComponentSpec],
3434    missing: &mut HashSet<String>,
3435    into: &mut HashMap<String, PackComponent>,
3436) -> Result<()> {
3437    for spec in specs {
3438        if !missing.contains(&spec.id) {
3439            continue;
3440        }
3441        let path = component_path_for_spec(root, spec);
3442        if !path.exists() {
3443            tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
3444            continue;
3445        }
3446        let bytes = std::fs::read(&path)
3447            .with_context(|| format!("failed to read component {}", path.display()))?;
3448        let component = compile_component_with_cache(cache, engine, None, bytes)
3449            .await
3450            .with_context(|| {
3451                format!(
3452                    "failed to compile component {} from {}",
3453                    spec.id,
3454                    path.display()
3455                )
3456            })?;
3457        into.insert(
3458            spec.id.clone(),
3459            PackComponent {
3460                name: spec.id.clone(),
3461                version: spec.version.clone(),
3462                component,
3463            },
3464        );
3465        missing.remove(&spec.id);
3466    }
3467    Ok(())
3468}
3469
3470async fn load_components_from_archive(
3471    cache: &CacheManager,
3472    engine: &Engine,
3473    path: &Path,
3474    specs: &[ComponentSpec],
3475    missing: &mut HashSet<String>,
3476    into: &mut HashMap<String, PackComponent>,
3477) -> Result<()> {
3478    let mut archive = ZipArchive::new(File::open(path)?)
3479        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
3480    for spec in specs {
3481        if !missing.contains(&spec.id) {
3482            continue;
3483        }
3484        let file_name = spec
3485            .legacy_path
3486            .clone()
3487            .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
3488        let bytes = match read_entry(&mut archive, &file_name) {
3489            Ok(bytes) => bytes,
3490            Err(err) => {
3491                warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
3492                continue;
3493            }
3494        };
3495        let component = compile_component_with_cache(cache, engine, None, bytes)
3496            .await
3497            .with_context(|| format!("failed to compile component {}", spec.id))?;
3498        into.insert(
3499            spec.id.clone(),
3500            PackComponent {
3501                name: spec.id.clone(),
3502                version: spec.version.clone(),
3503                component,
3504            },
3505        );
3506        missing.remove(&spec.id);
3507    }
3508    Ok(())
3509}
3510
3511#[cfg(test)]
3512mod tests {
3513    use super::*;
3514    use greentic_flow::model::{FlowDoc, NodeDoc};
3515    use indexmap::IndexMap;
3516    use serde_json::json;
3517
3518    #[test]
3519    fn normalizes_raw_component_to_component_exec() {
3520        let mut nodes = IndexMap::new();
3521        let mut raw = IndexMap::new();
3522        raw.insert(
3523            "templating.handlebars".into(),
3524            json!({ "template": "Hi {{name}}" }),
3525        );
3526        nodes.insert(
3527            "start".into(),
3528            NodeDoc {
3529                raw,
3530                routing: json!([{"out": true}]),
3531                ..Default::default()
3532            },
3533        );
3534        let doc = FlowDoc {
3535            id: "welcome".into(),
3536            title: None,
3537            description: None,
3538            flow_type: "messaging".into(),
3539            start: Some("start".into()),
3540            parameters: json!({}),
3541            tags: Vec::new(),
3542            schema_version: None,
3543            entrypoints: IndexMap::new(),
3544            nodes,
3545        };
3546
3547        let normalized = normalize_flow_doc(doc);
3548        let node = normalized.nodes.get("start").expect("node exists");
3549        assert_eq!(node.operation.as_deref(), Some("component.exec"));
3550        assert!(node.raw.is_empty());
3551        let payload = node.payload.as_object().expect("payload object");
3552        assert_eq!(
3553            payload.get("component"),
3554            Some(&Value::String("templating.handlebars".into()))
3555        );
3556        assert_eq!(
3557            payload.get("operation"),
3558            Some(&Value::String("render".into()))
3559        );
3560        let input = payload.get("input").unwrap();
3561        assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
3562    }
3563}
3564
3565#[derive(Clone, Debug, Default, Serialize, Deserialize)]
3566pub struct PackMetadata {
3567    pub pack_id: String,
3568    pub version: String,
3569    #[serde(default)]
3570    pub entry_flows: Vec<String>,
3571    #[serde(default)]
3572    pub secret_requirements: Vec<greentic_types::SecretRequirement>,
3573}
3574
3575impl PackMetadata {
3576    fn from_wasm(bytes: &[u8]) -> Option<Self> {
3577        let parser = Parser::new(0);
3578        for payload in parser.parse_all(bytes) {
3579            let payload = payload.ok()?;
3580            match payload {
3581                Payload::CustomSection(section) => {
3582                    if section.name() == "greentic.manifest"
3583                        && let Ok(meta) = Self::from_bytes(section.data())
3584                    {
3585                        return Some(meta);
3586                    }
3587                }
3588                Payload::DataSection(reader) => {
3589                    for segment in reader.into_iter().flatten() {
3590                        if let Ok(meta) = Self::from_bytes(segment.data) {
3591                            return Some(meta);
3592                        }
3593                    }
3594                }
3595                _ => {}
3596            }
3597        }
3598        None
3599    }
3600
3601    fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
3602        #[derive(Deserialize)]
3603        struct RawManifest {
3604            pack_id: String,
3605            version: String,
3606            #[serde(default)]
3607            entry_flows: Vec<String>,
3608            #[serde(default)]
3609            flows: Vec<RawFlow>,
3610            #[serde(default)]
3611            secret_requirements: Vec<greentic_types::SecretRequirement>,
3612        }
3613
3614        #[derive(Deserialize)]
3615        struct RawFlow {
3616            id: String,
3617        }
3618
3619        let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
3620        let mut entry_flows = if manifest.entry_flows.is_empty() {
3621            manifest.flows.iter().map(|f| f.id.clone()).collect()
3622        } else {
3623            manifest.entry_flows.clone()
3624        };
3625        entry_flows.retain(|id| !id.is_empty());
3626        Ok(Self {
3627            pack_id: manifest.pack_id,
3628            version: manifest.version,
3629            entry_flows,
3630            secret_requirements: manifest.secret_requirements,
3631        })
3632    }
3633
3634    pub fn fallback(path: &Path) -> Self {
3635        let pack_id = path
3636            .file_stem()
3637            .map(|s| s.to_string_lossy().into_owned())
3638            .unwrap_or_else(|| "unknown-pack".to_string());
3639        Self {
3640            pack_id,
3641            version: "0.0.0".to_string(),
3642            entry_flows: Vec::new(),
3643            secret_requirements: Vec::new(),
3644        }
3645    }
3646
3647    pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
3648        let entry_flows = manifest
3649            .flows
3650            .iter()
3651            .map(|flow| flow.id.as_str().to_string())
3652            .collect::<Vec<_>>();
3653        Self {
3654            pack_id: manifest.pack_id.as_str().to_string(),
3655            version: manifest.version.to_string(),
3656            entry_flows,
3657            secret_requirements: manifest.secret_requirements.clone(),
3658        }
3659    }
3660}