Skip to main content

greentic_runner_host/
pack.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::cache::{ArtifactKey, CacheConfig, CacheManager, CpuPolicy, EngineProfile};
10use crate::component_api::{
11    self, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::provider::{ProviderBinding, ProviderRegistry};
15use crate::provider_core::{
16    schema_core::SchemaCorePre as LegacySchemaCorePre,
17    schema_core_schema::SchemaCorePre as SchemaSchemaCorePre,
18};
19use crate::provider_core_only;
20use crate::runtime_wasmtime::{Component, Engine, InstancePre, Linker, ResourceTable};
21use anyhow::{Context, Result, anyhow, bail};
22use futures::executor::block_on;
23use greentic_distributor_client::dist::{DistClient, DistError, DistOptions};
24use greentic_interfaces_wasmtime::host_helpers::v1::{
25    self as host_v1, HostFns, add_all_v1_to_linker,
26    runner_host_http::RunnerHostHttp,
27    runner_host_kv::RunnerHostKv,
28    secrets_store::{SecretsError, SecretsErrorV1_1, SecretsStoreHost, SecretsStoreHostV1_1},
29    state_store::{
30        OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
31        StateStoreHost, TenantCtx as StateTenantCtx,
32    },
33    telemetry_logger::{
34        OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
35        TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
36        TenantCtx as TelemetryTenantCtx,
37    },
38};
39use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
40use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::http::http_client as http_client_client_alias;
41use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
42use greentic_pack::builder as legacy_pack;
43use greentic_types::flow::FlowHasher;
44use greentic_types::{
45    ArtifactLocationV1, ComponentId, ComponentManifest, ComponentSourceRef, ComponentSourcesV1,
46    EXT_COMPONENT_SOURCES_V1, EnvId, ExtensionRef, Flow, FlowComponentRef, FlowId, FlowKind,
47    FlowMetadata, InputMapping, Node, NodeId, OutputMapping, Routing, StateKey as StoreStateKey,
48    TeamId, TelemetryHints, TenantCtx as TypesTenantCtx, TenantId, UserId, decode_pack_manifest,
49    pack_manifest::ExtensionInline,
50};
51use host_v1::http_client as host_http_client;
52use host_v1::http_client::{
53    HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
54    Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
55    RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
56    TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
57};
58use indexmap::IndexMap;
59use once_cell::sync::Lazy;
60use parking_lot::{Mutex, RwLock};
61use reqwest::blocking::Client as BlockingClient;
62use runner_core::normalize_under_root;
63use serde::{Deserialize, Serialize};
64use serde_cbor;
65use serde_json::{self, Value};
66use sha2::Digest;
67use tempfile::TempDir;
68use tokio::fs;
69use wasmparser::{Parser, Payload};
70use wasmtime::{Store, StoreContextMut};
71use zip::ZipArchive;
72
73use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
74use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
75use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
76#[cfg(feature = "fault-injection")]
77use crate::testing::fault_injection::{FaultContext, FaultPoint, maybe_fail};
78
79use crate::config::HostConfig;
80use crate::fault;
81use crate::secrets::{DynSecretsManager, read_secret_blocking, write_secret_blocking};
82use crate::storage::state::STATE_PREFIX;
83use crate::storage::{DynSessionStore, DynStateStore};
84use crate::verify;
85use crate::wasi::{PreopenSpec, RunnerWasiPolicy};
86use tracing::warn;
87use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
88use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
89
90use greentic_flow::model::FlowDoc;
91
92#[allow(dead_code)]
93pub struct PackRuntime {
94    /// Component artifact path (wasm file).
95    path: PathBuf,
96    /// Optional archive (.gtpack) used to load flows/manifests.
97    archive_path: Option<PathBuf>,
98    config: Arc<HostConfig>,
99    engine: Engine,
100    metadata: PackMetadata,
101    manifest: Option<greentic_types::PackManifest>,
102    legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
103    component_manifests: HashMap<String, ComponentManifest>,
104    mocks: Option<Arc<MockLayer>>,
105    flows: Option<PackFlows>,
106    components: HashMap<String, PackComponent>,
107    http_client: Arc<BlockingClient>,
108    pre_cache: Mutex<HashMap<String, InstancePre<ComponentState>>>,
109    session_store: Option<DynSessionStore>,
110    state_store: Option<DynStateStore>,
111    wasi_policy: Arc<RunnerWasiPolicy>,
112    assets_tempdir: Option<TempDir>,
113    provider_registry: RwLock<Option<ProviderRegistry>>,
114    secrets: DynSecretsManager,
115    oauth_config: Option<OAuthBrokerConfig>,
116    cache: CacheManager,
117}
118
119struct PackComponent {
120    #[allow(dead_code)]
121    name: String,
122    #[allow(dead_code)]
123    version: String,
124    component: Arc<Component>,
125}
126
127fn run_on_wasi_thread<F, T>(task_name: &'static str, task: F) -> Result<T>
128where
129    F: FnOnce() -> Result<T> + Send + 'static,
130    T: Send + 'static,
131{
132    let builder = std::thread::Builder::new().name(format!("greentic-wasmtime-{task_name}"));
133    let handle = builder
134        .spawn(move || {
135            let pid = std::process::id();
136            let thread_id = std::thread::current().id();
137            let tokio_handle_present = tokio::runtime::Handle::try_current().is_ok();
138            tracing::info!(
139                event = "wasmtime.thread.start",
140                task = task_name,
141                pid,
142                thread_id = ?thread_id,
143                tokio_handle_present,
144                "starting Wasmtime thread"
145            );
146            task()
147        })
148        .context("failed to spawn Wasmtime thread")?;
149    handle
150        .join()
151        .map_err(|err| {
152            let reason = if let Some(msg) = err.downcast_ref::<&str>() {
153                msg.to_string()
154            } else if let Some(msg) = err.downcast_ref::<String>() {
155                msg.clone()
156            } else {
157                "unknown panic".to_string()
158            };
159            anyhow!("Wasmtime thread panicked: {reason}")
160        })
161        .and_then(|res| res)
162}
163
164#[derive(Debug, Default, Clone)]
165pub struct ComponentResolution {
166    /// Root of a materialized pack directory containing `manifest.cbor` and `components/`.
167    pub materialized_root: Option<PathBuf>,
168    /// Explicit overrides mapping component id -> wasm path.
169    pub overrides: HashMap<String, PathBuf>,
170    /// If true, do not fetch remote components; require cached artifacts.
171    pub dist_offline: bool,
172    /// Optional cache directory for resolved remote components.
173    pub dist_cache_dir: Option<PathBuf>,
174    /// Allow bundled components without wasm_sha256 (dev-only escape hatch).
175    pub allow_missing_hash: bool,
176}
177
178fn build_blocking_client() -> BlockingClient {
179    std::thread::spawn(|| {
180        BlockingClient::builder()
181            .no_proxy()
182            .build()
183            .expect("blocking client")
184    })
185    .join()
186    .expect("client build thread panicked")
187}
188
189fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
190    let (root, candidate) = if path.is_absolute() {
191        let parent = path
192            .parent()
193            .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
194        let root = parent
195            .canonicalize()
196            .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
197        let file = path
198            .file_name()
199            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
200        (root, PathBuf::from(file))
201    } else {
202        let cwd = std::env::current_dir().context("failed to resolve current directory")?;
203        let base = if let Some(parent) = path.parent() {
204            cwd.join(parent)
205        } else {
206            cwd
207        };
208        let root = base
209            .canonicalize()
210            .with_context(|| format!("failed to canonicalize {}", base.display()))?;
211        let file = path
212            .file_name()
213            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
214        (root, PathBuf::from(file))
215    };
216    let safe = normalize_under_root(&root, &candidate)?;
217    Ok((root, safe))
218}
219
220static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
221
222#[derive(Debug, Clone, Serialize, Deserialize)]
223pub struct FlowDescriptor {
224    pub id: String,
225    #[serde(rename = "type")]
226    pub flow_type: String,
227    pub pack_id: String,
228    pub profile: String,
229    pub version: String,
230    #[serde(default)]
231    pub description: Option<String>,
232}
233
234pub struct HostState {
235    #[allow(dead_code)]
236    pack_id: String,
237    config: Arc<HostConfig>,
238    http_client: Arc<BlockingClient>,
239    default_env: String,
240    #[allow(dead_code)]
241    session_store: Option<DynSessionStore>,
242    state_store: Option<DynStateStore>,
243    mocks: Option<Arc<MockLayer>>,
244    secrets: DynSecretsManager,
245    oauth_config: Option<OAuthBrokerConfig>,
246    oauth_host: OAuthBrokerHost,
247    exec_ctx: Option<ComponentExecCtx>,
248    component_ref: Option<String>,
249    provider_core_component: bool,
250}
251
252impl HostState {
253    #[allow(clippy::default_constructed_unit_structs)]
254    #[allow(clippy::too_many_arguments)]
255    pub fn new(
256        pack_id: String,
257        config: Arc<HostConfig>,
258        http_client: Arc<BlockingClient>,
259        mocks: Option<Arc<MockLayer>>,
260        session_store: Option<DynSessionStore>,
261        state_store: Option<DynStateStore>,
262        secrets: DynSecretsManager,
263        oauth_config: Option<OAuthBrokerConfig>,
264        exec_ctx: Option<ComponentExecCtx>,
265        component_ref: Option<String>,
266        provider_core_component: bool,
267    ) -> Result<Self> {
268        let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
269        Ok(Self {
270            pack_id,
271            config,
272            http_client,
273            default_env,
274            session_store,
275            state_store,
276            mocks,
277            secrets,
278            oauth_config,
279            oauth_host: OAuthBrokerHost::default(),
280            exec_ctx,
281            component_ref,
282            provider_core_component,
283        })
284    }
285
286    fn instantiate_component_result(
287        linker: &mut Linker<ComponentState>,
288        store: &mut Store<ComponentState>,
289        component: &Component,
290        ctx: &ComponentExecCtx,
291        operation: &str,
292        input_json: &str,
293    ) -> Result<InvokeResult> {
294        let pre_instance = linker.instantiate_pre(component)?;
295        match component_api::v0_5::ComponentPre::new(pre_instance) {
296            Ok(pre) => {
297                let result = block_on(async {
298                    let bindings = pre.instantiate_async(&mut *store).await?;
299                    let node = bindings.greentic_component_node();
300                    let ctx_v05 = component_api::exec_ctx_v0_5(ctx);
301                    let operation_owned = operation.to_string();
302                    let input_owned = input_json.to_string();
303                    node.call_invoke(&mut *store, &ctx_v05, &operation_owned, &input_owned)
304                })?;
305                Ok(component_api::invoke_result_from_v0_5(result))
306            }
307            Err(err) => {
308                if is_missing_node_export(&err, "0.5.0") {
309                    let pre_instance = linker.instantiate_pre(component)?;
310                    let pre: component_api::v0_4::ComponentPre<ComponentState> =
311                        component_api::v0_4::ComponentPre::new(pre_instance)?;
312                    let result = block_on(async {
313                        let bindings = pre.instantiate_async(&mut *store).await?;
314                        let node = bindings.greentic_component_node();
315                        let ctx_v04 = component_api::exec_ctx_v0_4(ctx);
316                        let operation_owned = operation.to_string();
317                        let input_owned = input_json.to_string();
318                        node.call_invoke(&mut *store, &ctx_v04, &operation_owned, &input_owned)
319                    })?;
320                    Ok(component_api::invoke_result_from_v0_4(result))
321                } else {
322                    Err(err)
323                }
324            }
325        }
326    }
327
328    fn convert_invoke_result(result: InvokeResult) -> Result<Value> {
329        match result {
330            InvokeResult::Ok(body) => {
331                if body.is_empty() {
332                    return Ok(Value::Null);
333                }
334                serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
335            }
336            InvokeResult::Err(NodeError {
337                code,
338                message,
339                retryable,
340                backoff_ms,
341                details,
342            }) => {
343                let mut obj = serde_json::Map::new();
344                obj.insert("ok".into(), Value::Bool(false));
345                let mut error = serde_json::Map::new();
346                error.insert("code".into(), Value::String(code));
347                error.insert("message".into(), Value::String(message));
348                error.insert("retryable".into(), Value::Bool(retryable));
349                if let Some(backoff) = backoff_ms {
350                    error.insert("backoff_ms".into(), Value::Number(backoff.into()));
351                }
352                if let Some(details) = details {
353                    error.insert(
354                        "details".into(),
355                        serde_json::from_str(&details).unwrap_or(Value::String(details)),
356                    );
357                }
358                obj.insert("error".into(), Value::Object(error));
359                Ok(Value::Object(obj))
360            }
361        }
362    }
363
364    pub fn get_secret(&self, key: &str) -> Result<String> {
365        if provider_core_only::is_enabled() {
366            bail!(provider_core_only::blocked_message("secrets"))
367        }
368        if !self.config.secrets_policy.is_allowed(key) {
369            bail!("secret {key} is not permitted by bindings policy");
370        }
371        if let Some(mock) = &self.mocks
372            && let Some(value) = mock.secrets_lookup(key)
373        {
374            return Ok(value);
375        }
376        let ctx = self.config.tenant_ctx();
377        let bytes = read_secret_blocking(&self.secrets, &ctx, key)
378            .context("failed to read secret from manager")?;
379        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
380        Ok(value)
381    }
382
383    fn allows_secret_write_in_provider_core_only(&self) -> bool {
384        self.provider_core_component || self.component_ref.is_none()
385    }
386
387    fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
388        let tenant_raw = ctx
389            .as_ref()
390            .map(|ctx| ctx.tenant.clone())
391            .or_else(|| self.exec_ctx.as_ref().map(|ctx| ctx.tenant.tenant.clone()))
392            .unwrap_or_else(|| self.config.tenant.clone());
393        let env_raw = ctx
394            .as_ref()
395            .map(|ctx| ctx.env.clone())
396            .unwrap_or_else(|| self.default_env.clone());
397        let tenant_id = TenantId::from_str(&tenant_raw)
398            .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
399        let env_id = EnvId::from_str(&env_raw)
400            .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
401        let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
402        if let Some(exec_ctx) = self.exec_ctx.as_ref() {
403            if let Some(team) = exec_ctx.tenant.team.as_ref() {
404                let team_id =
405                    TeamId::from_str(team).with_context(|| format!("invalid team id `{team}`"))?;
406                tenant_ctx = tenant_ctx.with_team(Some(team_id));
407            }
408            if let Some(user) = exec_ctx.tenant.user.as_ref() {
409                let user_id =
410                    UserId::from_str(user).with_context(|| format!("invalid user id `{user}`"))?;
411                tenant_ctx = tenant_ctx.with_user(Some(user_id));
412            }
413            tenant_ctx = tenant_ctx.with_flow(exec_ctx.flow_id.clone());
414            if let Some(node) = exec_ctx.node_id.as_ref() {
415                tenant_ctx = tenant_ctx.with_node(node.clone());
416            }
417            if let Some(session) = exec_ctx.tenant.correlation_id.as_ref() {
418                tenant_ctx = tenant_ctx.with_session(session.clone());
419            }
420            tenant_ctx.trace_id = exec_ctx.tenant.trace_id.clone();
421        }
422
423        if let Some(ctx) = ctx {
424            if let Some(team) = ctx.team.or(ctx.team_id) {
425                let team_id =
426                    TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
427                tenant_ctx = tenant_ctx.with_team(Some(team_id));
428            }
429            if let Some(user) = ctx.user.or(ctx.user_id) {
430                let user_id =
431                    UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
432                tenant_ctx = tenant_ctx.with_user(Some(user_id));
433            }
434            if let Some(flow) = ctx.flow_id {
435                tenant_ctx = tenant_ctx.with_flow(flow);
436            }
437            if let Some(node) = ctx.node_id {
438                tenant_ctx = tenant_ctx.with_node(node);
439            }
440            if let Some(provider) = ctx.provider_id {
441                tenant_ctx = tenant_ctx.with_provider(provider);
442            }
443            if let Some(session) = ctx.session_id {
444                tenant_ctx = tenant_ctx.with_session(session);
445            }
446            tenant_ctx.trace_id = ctx.trace_id;
447        }
448        Ok(tenant_ctx)
449    }
450
451    fn send_http_request(
452        &mut self,
453        req: HttpRequest,
454        opts: Option<HttpRequestOptionsV1_1>,
455        _ctx: Option<HttpTenantCtx>,
456    ) -> Result<HttpResponse, HttpClientError> {
457        if !self.config.http_enabled {
458            return Err(HttpClientError {
459                code: "denied".into(),
460                message: "http client disabled by policy".into(),
461            });
462        }
463
464        let mut mock_state = None;
465        let raw_body = req.body.clone();
466        if let Some(mock) = &self.mocks
467            && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
468        {
469            match mock.http_begin(&meta) {
470                HttpDecision::Mock(response) => {
471                    let headers = response
472                        .headers
473                        .iter()
474                        .map(|(k, v)| (k.clone(), v.clone()))
475                        .collect();
476                    return Ok(HttpResponse {
477                        status: response.status,
478                        headers,
479                        body: response.body.clone().map(|b| b.into_bytes()),
480                    });
481                }
482                HttpDecision::Deny(reason) => {
483                    return Err(HttpClientError {
484                        code: "denied".into(),
485                        message: reason,
486                    });
487                }
488                HttpDecision::Passthrough { record } => {
489                    mock_state = Some((meta, record));
490                }
491            }
492        }
493
494        let method = req.method.parse().unwrap_or(reqwest::Method::GET);
495        let mut builder = self.http_client.request(method, &req.url);
496        for (key, value) in req.headers {
497            if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
498                && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
499            {
500                builder = builder.header(header, header_value);
501            }
502        }
503
504        if let Some(body) = raw_body.clone() {
505            builder = builder.body(body);
506        }
507
508        if let Some(opts) = opts {
509            if let Some(timeout_ms) = opts.timeout_ms {
510                builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
511            }
512            if opts.allow_insecure == Some(true) {
513                warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
514            }
515            if let Some(follow_redirects) = opts.follow_redirects
516                && !follow_redirects
517            {
518                warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
519            }
520        }
521
522        let response = match builder.send() {
523            Ok(resp) => resp,
524            Err(err) => {
525                warn!(url = %req.url, error = %err, "http client request failed");
526                return Err(HttpClientError {
527                    code: "unavailable".into(),
528                    message: err.to_string(),
529                });
530            }
531        };
532
533        let status = response.status().as_u16();
534        let headers_vec = response
535            .headers()
536            .iter()
537            .map(|(k, v)| {
538                (
539                    k.as_str().to_string(),
540                    v.to_str().unwrap_or_default().to_string(),
541                )
542            })
543            .collect::<Vec<_>>();
544        let body_bytes = response.bytes().ok().map(|b| b.to_vec());
545
546        if let Some((meta, true)) = mock_state.take()
547            && let Some(mock) = &self.mocks
548        {
549            let recorded = HttpMockResponse::new(
550                status,
551                headers_vec.clone().into_iter().collect(),
552                body_bytes
553                    .as_ref()
554                    .map(|b| String::from_utf8_lossy(b).into_owned()),
555            );
556            mock.http_record(&meta, &recorded);
557        }
558
559        Ok(HttpResponse {
560            status,
561            headers: headers_vec,
562            body: body_bytes,
563        })
564    }
565}
566
567impl SecretsStoreHost for HostState {
568    fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
569        if provider_core_only::is_enabled() {
570            warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
571            return Err(SecretsError::Denied);
572        }
573        if !self.config.secrets_policy.is_allowed(&key) {
574            return Err(SecretsError::Denied);
575        }
576        if let Some(mock) = &self.mocks
577            && let Some(value) = mock.secrets_lookup(&key)
578        {
579            return Ok(Some(value.into_bytes()));
580        }
581        let ctx = self.config.tenant_ctx();
582        match read_secret_blocking(&self.secrets, &ctx, &key) {
583            Ok(bytes) => Ok(Some(bytes)),
584            Err(err) => {
585                warn!(secret = %key, error = %err, "secret lookup failed");
586                Err(SecretsError::NotFound)
587            }
588        }
589    }
590}
591
592impl SecretsStoreHostV1_1 for HostState {
593    fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsErrorV1_1> {
594        if provider_core_only::is_enabled() {
595            warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
596            return Err(SecretsErrorV1_1::Denied);
597        }
598        if !self.config.secrets_policy.is_allowed(&key) {
599            return Err(SecretsErrorV1_1::Denied);
600        }
601        if let Some(mock) = &self.mocks
602            && let Some(value) = mock.secrets_lookup(&key)
603        {
604            return Ok(Some(value.into_bytes()));
605        }
606        let ctx = self.config.tenant_ctx();
607        match read_secret_blocking(&self.secrets, &ctx, &key) {
608            Ok(bytes) => Ok(Some(bytes)),
609            Err(err) => {
610                warn!(secret = %key, error = %err, "secret lookup failed");
611                Err(SecretsErrorV1_1::NotFound)
612            }
613        }
614    }
615
616    fn put(&mut self, key: String, value: Vec<u8>) {
617        if key.trim().is_empty() {
618            warn!(secret = %key, "secret write blocked: empty key");
619            panic!("secret write denied for key {key}: invalid key");
620        }
621        if provider_core_only::is_enabled() && !self.allows_secret_write_in_provider_core_only() {
622            warn!(
623                secret = %key,
624                component = self.component_ref.as_deref().unwrap_or("<pack>"),
625                "provider-core only mode enabled; blocking secrets store write"
626            );
627            panic!("secret write denied for key {key}: provider-core-only mode");
628        }
629        if !self.config.secrets_policy.is_allowed(&key) {
630            warn!(secret = %key, "secret write denied by bindings policy");
631            panic!("secret write denied for key {key}: policy");
632        }
633        let ctx = self.config.tenant_ctx();
634        if let Err(err) = write_secret_blocking(&self.secrets, &ctx, &key, &value) {
635            warn!(secret = %key, error = %err, "secret write failed");
636            panic!("secret write failed for key {key}");
637        }
638    }
639}
640
641impl HttpClientHost for HostState {
642    fn send(
643        &mut self,
644        req: HttpRequest,
645        ctx: Option<HttpTenantCtx>,
646    ) -> Result<HttpResponse, HttpClientError> {
647        self.send_http_request(req, None, ctx)
648    }
649}
650
651impl HttpClientHostV1_1 for HostState {
652    fn send(
653        &mut self,
654        req: HttpRequestV1_1,
655        opts: Option<HttpRequestOptionsV1_1>,
656        ctx: Option<HttpTenantCtxV1_1>,
657    ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
658        let legacy_req = HttpRequest {
659            method: req.method,
660            url: req.url,
661            headers: req.headers,
662            body: req.body,
663        };
664        let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
665            env: ctx.env,
666            tenant: ctx.tenant,
667            tenant_id: ctx.tenant_id,
668            team: ctx.team,
669            team_id: ctx.team_id,
670            user: ctx.user,
671            user_id: ctx.user_id,
672            trace_id: ctx.trace_id,
673            correlation_id: ctx.correlation_id,
674            attributes: ctx.attributes,
675            session_id: ctx.session_id,
676            flow_id: ctx.flow_id,
677            node_id: ctx.node_id,
678            provider_id: ctx.provider_id,
679            deadline_ms: ctx.deadline_ms,
680            attempt: ctx.attempt,
681            idempotency_key: ctx.idempotency_key,
682            impersonation: ctx
683                .impersonation
684                .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
685                    actor_id,
686                    reason,
687                }),
688        });
689
690        self.send_http_request(legacy_req, opts, legacy_ctx)
691            .map(|resp| HttpResponseV1_1 {
692                status: resp.status,
693                headers: resp.headers,
694                body: resp.body,
695            })
696            .map_err(|err| HttpClientErrorV1_1 {
697                code: err.code,
698                message: err.message,
699            })
700    }
701}
702
703impl StateStoreHost for HostState {
704    fn read(
705        &mut self,
706        key: HostStateKey,
707        ctx: Option<StateTenantCtx>,
708    ) -> Result<Vec<u8>, StateError> {
709        let store = match self.state_store.as_ref() {
710            Some(store) => store.clone(),
711            None => {
712                return Err(StateError {
713                    code: "unavailable".into(),
714                    message: "state store not configured".into(),
715                });
716            }
717        };
718        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
719            Ok(ctx) => ctx,
720            Err(err) => {
721                return Err(StateError {
722                    code: "invalid-ctx".into(),
723                    message: err.to_string(),
724                });
725            }
726        };
727        #[cfg(feature = "fault-injection")]
728        {
729            let exec_ctx = self.exec_ctx.as_ref();
730            let flow_id = exec_ctx
731                .map(|ctx| ctx.flow_id.as_str())
732                .unwrap_or("unknown");
733            let node_id = exec_ctx.and_then(|ctx| ctx.node_id.as_deref());
734            let attempt = exec_ctx.map(|ctx| ctx.tenant.attempt).unwrap_or(1);
735            let fault_ctx = FaultContext {
736                pack_id: self.pack_id.as_str(),
737                flow_id,
738                node_id,
739                attempt,
740            };
741            if let Err(err) = maybe_fail(FaultPoint::StateRead, fault_ctx) {
742                return Err(StateError {
743                    code: "internal".into(),
744                    message: err.to_string(),
745                });
746            }
747        }
748        let key = StoreStateKey::from(key);
749        match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
750            Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
751            Ok(None) => Err(StateError {
752                code: "not_found".into(),
753                message: "state key not found".into(),
754            }),
755            Err(err) => Err(StateError {
756                code: "internal".into(),
757                message: err.to_string(),
758            }),
759        }
760    }
761
762    fn write(
763        &mut self,
764        key: HostStateKey,
765        bytes: Vec<u8>,
766        ctx: Option<StateTenantCtx>,
767    ) -> Result<StateOpAck, StateError> {
768        let store = match self.state_store.as_ref() {
769            Some(store) => store.clone(),
770            None => {
771                return Err(StateError {
772                    code: "unavailable".into(),
773                    message: "state store not configured".into(),
774                });
775            }
776        };
777        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
778            Ok(ctx) => ctx,
779            Err(err) => {
780                return Err(StateError {
781                    code: "invalid-ctx".into(),
782                    message: err.to_string(),
783                });
784            }
785        };
786        #[cfg(feature = "fault-injection")]
787        {
788            let exec_ctx = self.exec_ctx.as_ref();
789            let flow_id = exec_ctx
790                .map(|ctx| ctx.flow_id.as_str())
791                .unwrap_or("unknown");
792            let node_id = exec_ctx.and_then(|ctx| ctx.node_id.as_deref());
793            let attempt = exec_ctx.map(|ctx| ctx.tenant.attempt).unwrap_or(1);
794            let fault_ctx = FaultContext {
795                pack_id: self.pack_id.as_str(),
796                flow_id,
797                node_id,
798                attempt,
799            };
800            if let Err(err) = maybe_fail(FaultPoint::StateWrite, fault_ctx) {
801                return Err(StateError {
802                    code: "internal".into(),
803                    message: err.to_string(),
804                });
805            }
806        }
807        let key = StoreStateKey::from(key);
808        let value = serde_json::from_slice(&bytes)
809            .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
810        match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
811            Ok(()) => Ok(StateOpAck::Ok),
812            Err(err) => Err(StateError {
813                code: "internal".into(),
814                message: err.to_string(),
815            }),
816        }
817    }
818
819    fn delete(
820        &mut self,
821        key: HostStateKey,
822        ctx: Option<StateTenantCtx>,
823    ) -> Result<StateOpAck, StateError> {
824        let store = match self.state_store.as_ref() {
825            Some(store) => store.clone(),
826            None => {
827                return Err(StateError {
828                    code: "unavailable".into(),
829                    message: "state store not configured".into(),
830                });
831            }
832        };
833        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
834            Ok(ctx) => ctx,
835            Err(err) => {
836                return Err(StateError {
837                    code: "invalid-ctx".into(),
838                    message: err.to_string(),
839                });
840            }
841        };
842        let key = StoreStateKey::from(key);
843        match store.del(&tenant_ctx, STATE_PREFIX, &key) {
844            Ok(_) => Ok(StateOpAck::Ok),
845            Err(err) => Err(StateError {
846                code: "internal".into(),
847                message: err.to_string(),
848            }),
849        }
850    }
851}
852
853impl TelemetryLoggerHost for HostState {
854    fn log(
855        &mut self,
856        span: TelemetrySpanContext,
857        fields: Vec<(String, String)>,
858        _ctx: Option<TelemetryTenantCtx>,
859    ) -> Result<TelemetryAck, TelemetryError> {
860        if let Some(mock) = &self.mocks
861            && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
862        {
863            return Ok(TelemetryAck::Ok);
864        }
865        let mut map = serde_json::Map::new();
866        for (k, v) in fields {
867            map.insert(k, Value::String(v));
868        }
869        tracing::info!(
870            tenant = %span.tenant,
871            flow_id = %span.flow_id,
872            node = ?span.node_id,
873            provider = %span.provider,
874            fields = %serde_json::Value::Object(map.clone()),
875            "telemetry log from pack"
876        );
877        Ok(TelemetryAck::Ok)
878    }
879}
880
881impl RunnerHostHttp for HostState {
882    fn request(
883        &mut self,
884        method: String,
885        url: String,
886        headers: Vec<String>,
887        body: Option<Vec<u8>>,
888    ) -> Result<Vec<u8>, String> {
889        let req = HttpRequest {
890            method,
891            url,
892            headers: headers
893                .chunks(2)
894                .filter_map(|chunk| {
895                    if chunk.len() == 2 {
896                        Some((chunk[0].clone(), chunk[1].clone()))
897                    } else {
898                        None
899                    }
900                })
901                .collect(),
902            body,
903        };
904        match HttpClientHost::send(self, req, None) {
905            Ok(resp) => Ok(resp.body.unwrap_or_default()),
906            Err(err) => Err(err.message),
907        }
908    }
909}
910
911impl RunnerHostKv for HostState {
912    fn get(&mut self, _ns: String, _key: String) -> Option<String> {
913        None
914    }
915
916    fn put(&mut self, _ns: String, _key: String, _val: String) {}
917}
918
919enum ManifestLoad {
920    New {
921        manifest: Box<greentic_types::PackManifest>,
922        flows: PackFlows,
923    },
924    Legacy {
925        manifest: Box<legacy_pack::PackManifest>,
926        flows: PackFlows,
927    },
928}
929
930fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
931    let mut archive = ZipArchive::new(File::open(path)?)
932        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
933    let bytes = read_entry(&mut archive, "manifest.cbor")
934        .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
935    match decode_pack_manifest(&bytes) {
936        Ok(manifest) => {
937            let cache = PackFlows::from_manifest(manifest.clone());
938            Ok(ManifestLoad::New {
939                manifest: Box::new(manifest),
940                flows: cache,
941            })
942        }
943        Err(err) => {
944            tracing::debug!(
945                error = %err,
946                pack = %path.display(),
947                "decode_pack_manifest failed for archive; falling back to legacy manifest"
948            );
949            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
950                .context("failed to decode legacy pack manifest from manifest.cbor")?;
951            let flows = load_legacy_flows_from_archive(&mut archive, path, &legacy)?;
952            Ok(ManifestLoad::Legacy {
953                manifest: Box::new(legacy),
954                flows,
955            })
956        }
957    }
958}
959
960fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
961    let manifest_path = root.join("manifest.cbor");
962    let bytes = std::fs::read(&manifest_path)
963        .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
964    match decode_pack_manifest(&bytes) {
965        Ok(manifest) => {
966            let cache = PackFlows::from_manifest(manifest.clone());
967            Ok(ManifestLoad::New {
968                manifest: Box::new(manifest),
969                flows: cache,
970            })
971        }
972        Err(err) => {
973            tracing::debug!(
974                error = %err,
975                pack = %root.display(),
976                "decode_pack_manifest failed for materialized pack; trying legacy manifest"
977            );
978            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
979                .context("failed to decode legacy pack manifest from manifest.cbor")?;
980            let flows = load_legacy_flows_from_dir(root, &legacy)?;
981            Ok(ManifestLoad::Legacy {
982                manifest: Box::new(legacy),
983                flows,
984            })
985        }
986    }
987}
988
989fn load_legacy_flows_from_dir(
990    root: &Path,
991    manifest: &legacy_pack::PackManifest,
992) -> Result<PackFlows> {
993    build_legacy_flows(manifest, |rel_path| {
994        let path = root.join(rel_path);
995        std::fs::read(&path).with_context(|| format!("missing flow json {}", path.display()))
996    })
997}
998
999fn load_legacy_flows_from_archive(
1000    archive: &mut ZipArchive<File>,
1001    pack_path: &Path,
1002    manifest: &legacy_pack::PackManifest,
1003) -> Result<PackFlows> {
1004    build_legacy_flows(manifest, |rel_path| {
1005        read_entry(archive, rel_path)
1006            .with_context(|| format!("missing flow json {} in {}", rel_path, pack_path.display()))
1007    })
1008}
1009
1010fn build_legacy_flows(
1011    manifest: &legacy_pack::PackManifest,
1012    mut read_json: impl FnMut(&str) -> Result<Vec<u8>>,
1013) -> Result<PackFlows> {
1014    let mut flows = HashMap::new();
1015    let mut descriptors = Vec::new();
1016
1017    for entry in &manifest.flows {
1018        let bytes = read_json(&entry.file_json)
1019            .with_context(|| format!("missing flow json {}", entry.file_json))?;
1020        let doc = parse_flow_doc_with_legacy_aliases(&bytes, &entry.file_json)?;
1021        let normalized = normalize_flow_doc(doc);
1022        let flow_ir = flow_doc_to_ir(normalized)?;
1023        let flow = flow_ir_to_flow(flow_ir)?;
1024
1025        descriptors.push(FlowDescriptor {
1026            id: entry.id.clone(),
1027            flow_type: entry.kind.clone(),
1028            pack_id: manifest.meta.pack_id.clone(),
1029            profile: manifest.meta.pack_id.clone(),
1030            version: manifest.meta.version.to_string(),
1031            description: None,
1032        });
1033        flows.insert(entry.id.clone(), flow);
1034    }
1035
1036    let mut entry_flows = manifest.meta.entry_flows.clone();
1037    if entry_flows.is_empty() {
1038        entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
1039    }
1040    let metadata = PackMetadata {
1041        pack_id: manifest.meta.pack_id.clone(),
1042        version: manifest.meta.version.to_string(),
1043        entry_flows,
1044        secret_requirements: Vec::new(),
1045    };
1046
1047    Ok(PackFlows {
1048        descriptors,
1049        flows,
1050        metadata,
1051    })
1052}
1053
1054fn parse_flow_doc_with_legacy_aliases(bytes: &[u8], path: &str) -> Result<FlowDoc> {
1055    let mut value: Value = serde_json::from_slice(bytes)
1056        .with_context(|| format!("failed to decode flow doc {}", path))?;
1057    if let Some(map) = value.as_object_mut()
1058        && !map.contains_key("type")
1059        && let Some(flow_type) = map.remove("flow_type")
1060    {
1061        map.insert("type".to_string(), flow_type);
1062    }
1063    serde_json::from_value(value).with_context(|| format!("failed to decode flow doc {}", path))
1064}
1065
1066pub struct ComponentState {
1067    pub host: HostState,
1068    wasi_ctx: WasiCtx,
1069    resource_table: ResourceTable,
1070}
1071
1072impl ComponentState {
1073    pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
1074        let wasi_ctx = policy
1075            .instantiate()
1076            .context("failed to build WASI context")?;
1077        Ok(Self {
1078            host,
1079            wasi_ctx,
1080            resource_table: ResourceTable::new(),
1081        })
1082    }
1083
1084    fn host_mut(&mut self) -> &mut HostState {
1085        &mut self.host
1086    }
1087
1088    fn should_cancel_host(&mut self) -> bool {
1089        false
1090    }
1091
1092    fn yield_now_host(&mut self) {
1093        // no-op cooperative yield
1094    }
1095}
1096
1097impl component_api::v0_4::greentic::component::control::Host for ComponentState {
1098    fn should_cancel(&mut self) -> bool {
1099        self.should_cancel_host()
1100    }
1101
1102    fn yield_now(&mut self) {
1103        self.yield_now_host();
1104    }
1105}
1106
1107impl component_api::v0_5::greentic::component::control::Host for ComponentState {
1108    fn should_cancel(&mut self) -> bool {
1109        self.should_cancel_host()
1110    }
1111
1112    fn yield_now(&mut self) {
1113        self.yield_now_host();
1114    }
1115}
1116
1117fn add_component_control_instance(
1118    linker: &mut Linker<ComponentState>,
1119    name: &str,
1120) -> wasmtime::Result<()> {
1121    let mut inst = linker.instance(name)?;
1122    inst.func_wrap(
1123        "should-cancel",
1124        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
1125            let host = caller.data_mut();
1126            Ok((host.should_cancel_host(),))
1127        },
1128    )?;
1129    inst.func_wrap(
1130        "yield-now",
1131        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
1132            let host = caller.data_mut();
1133            host.yield_now_host();
1134            Ok(())
1135        },
1136    )?;
1137    Ok(())
1138}
1139
1140fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
1141    add_component_control_instance(linker, "greentic:component/control@0.5.0")?;
1142    add_component_control_instance(linker, "greentic:component/control@0.4.0")?;
1143    Ok(())
1144}
1145
1146pub fn register_all(linker: &mut Linker<ComponentState>, allow_state_store: bool) -> Result<()> {
1147    add_wasi_to_linker(linker)?;
1148    add_all_v1_to_linker(
1149        linker,
1150        HostFns {
1151            http_client_v1_1: Some(|state: &mut ComponentState| state.host_mut()),
1152            http_client: Some(|state: &mut ComponentState| state.host_mut()),
1153            oauth_broker: None,
1154            runner_host_http: Some(|state: &mut ComponentState| state.host_mut()),
1155            runner_host_kv: Some(|state: &mut ComponentState| state.host_mut()),
1156            telemetry_logger: Some(|state: &mut ComponentState| state.host_mut()),
1157            state_store: allow_state_store.then_some(|state: &mut ComponentState| state.host_mut()),
1158            secrets_store_v1_1: Some(|state: &mut ComponentState| state.host_mut()),
1159            secrets_store: None,
1160        },
1161    )?;
1162    add_http_client_client_world_aliases(linker)?;
1163    Ok(())
1164}
1165
1166fn add_http_client_client_world_aliases(linker: &mut Linker<ComponentState>) -> Result<()> {
1167    let mut inst_v1_1 = linker.instance("greentic:http/client@1.1.0")?;
1168    inst_v1_1.func_wrap(
1169        "send",
1170        move |mut caller: StoreContextMut<'_, ComponentState>,
1171              (req, opts, ctx): (
1172            http_client_client_alias::Request,
1173            Option<http_client_client_alias::RequestOptions>,
1174            Option<http_client_client_alias::TenantCtx>,
1175        )| {
1176            let host = caller.data_mut().host_mut();
1177            let result = HttpClientHostV1_1::send(
1178                host,
1179                alias_request_to_host(req),
1180                opts.map(alias_request_options_to_host),
1181                ctx.map(alias_tenant_ctx_to_host),
1182            );
1183            Ok((match result {
1184                Ok(resp) => Ok(alias_response_from_host(resp)),
1185                Err(err) => Err(alias_error_from_host(err)),
1186            },))
1187        },
1188    )?;
1189    let mut inst_v1_0 = linker.instance("greentic:http/client@1.0.0")?;
1190    inst_v1_0.func_wrap(
1191        "send",
1192        move |mut caller: StoreContextMut<'_, ComponentState>,
1193              (req, ctx): (
1194            host_http_client::Request,
1195            Option<host_http_client::TenantCtx>,
1196        )| {
1197            let host = caller.data_mut().host_mut();
1198            let result = HttpClientHost::send(host, req, ctx);
1199            Ok((result,))
1200        },
1201    )?;
1202    Ok(())
1203}
1204
1205fn alias_request_to_host(req: http_client_client_alias::Request) -> host_http_client::RequestV1_1 {
1206    host_http_client::RequestV1_1 {
1207        method: req.method,
1208        url: req.url,
1209        headers: req.headers,
1210        body: req.body,
1211    }
1212}
1213
1214fn alias_request_options_to_host(
1215    opts: http_client_client_alias::RequestOptions,
1216) -> host_http_client::RequestOptionsV1_1 {
1217    host_http_client::RequestOptionsV1_1 {
1218        timeout_ms: opts.timeout_ms,
1219        allow_insecure: opts.allow_insecure,
1220        follow_redirects: opts.follow_redirects,
1221    }
1222}
1223
1224fn alias_tenant_ctx_to_host(
1225    ctx: http_client_client_alias::TenantCtx,
1226) -> host_http_client::TenantCtxV1_1 {
1227    host_http_client::TenantCtxV1_1 {
1228        env: ctx.env,
1229        tenant: ctx.tenant,
1230        tenant_id: ctx.tenant_id,
1231        team: ctx.team,
1232        team_id: ctx.team_id,
1233        user: ctx.user,
1234        user_id: ctx.user_id,
1235        trace_id: ctx.trace_id,
1236        correlation_id: ctx.correlation_id,
1237        attributes: ctx.attributes,
1238        session_id: ctx.session_id,
1239        flow_id: ctx.flow_id,
1240        node_id: ctx.node_id,
1241        provider_id: ctx.provider_id,
1242        deadline_ms: ctx.deadline_ms,
1243        attempt: ctx.attempt,
1244        idempotency_key: ctx.idempotency_key,
1245        impersonation: ctx.impersonation.map(|imp| ImpersonationV1_1 {
1246            actor_id: imp.actor_id,
1247            reason: imp.reason,
1248        }),
1249    }
1250}
1251
1252fn alias_response_from_host(
1253    resp: host_http_client::ResponseV1_1,
1254) -> http_client_client_alias::Response {
1255    http_client_client_alias::Response {
1256        status: resp.status,
1257        headers: resp.headers,
1258        body: resp.body,
1259    }
1260}
1261
1262fn alias_error_from_host(
1263    err: host_http_client::HttpClientErrorV1_1,
1264) -> http_client_client_alias::HostError {
1265    http_client_client_alias::HostError {
1266        code: err.code,
1267        message: err.message,
1268    }
1269}
1270
1271impl OAuthHostContext for ComponentState {
1272    fn tenant_id(&self) -> &str {
1273        &self.host.config.tenant
1274    }
1275
1276    fn env(&self) -> &str {
1277        &self.host.default_env
1278    }
1279
1280    fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
1281        &mut self.host.oauth_host
1282    }
1283
1284    fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
1285        self.host.oauth_config.as_ref()
1286    }
1287}
1288
1289impl WasiView for ComponentState {
1290    fn ctx(&mut self) -> WasiCtxView<'_> {
1291        WasiCtxView {
1292            ctx: &mut self.wasi_ctx,
1293            table: &mut self.resource_table,
1294        }
1295    }
1296}
1297
1298#[allow(unsafe_code)]
1299unsafe impl Send for ComponentState {}
1300#[allow(unsafe_code)]
1301unsafe impl Sync for ComponentState {}
1302
1303impl PackRuntime {
1304    fn allows_state_store(&self, component_ref: &str) -> bool {
1305        if self.state_store.is_none() {
1306            return false;
1307        }
1308        if !self.config.state_store_policy.allow {
1309            return false;
1310        }
1311        let Some(manifest) = self.component_manifests.get(component_ref) else {
1312            return false;
1313        };
1314        manifest
1315            .capabilities
1316            .host
1317            .state
1318            .as_ref()
1319            .map(|caps| caps.read || caps.write)
1320            .unwrap_or(false)
1321    }
1322
1323    pub fn contains_component(&self, component_ref: &str) -> bool {
1324        self.components.contains_key(component_ref)
1325    }
1326
1327    #[allow(clippy::too_many_arguments)]
1328    pub async fn load(
1329        path: impl AsRef<Path>,
1330        config: Arc<HostConfig>,
1331        mocks: Option<Arc<MockLayer>>,
1332        archive_source: Option<&Path>,
1333        session_store: Option<DynSessionStore>,
1334        state_store: Option<DynStateStore>,
1335        wasi_policy: Arc<RunnerWasiPolicy>,
1336        secrets: DynSecretsManager,
1337        oauth_config: Option<OAuthBrokerConfig>,
1338        verify_archive: bool,
1339        component_resolution: ComponentResolution,
1340    ) -> Result<Self> {
1341        let path = path.as_ref();
1342        let (_pack_root, safe_path) = normalize_pack_path(path)?;
1343        let path_meta = std::fs::metadata(&safe_path).ok();
1344        let is_dir = path_meta
1345            .as_ref()
1346            .map(|meta| meta.is_dir())
1347            .unwrap_or(false);
1348        let is_component = !is_dir
1349            && safe_path
1350                .extension()
1351                .and_then(|ext| ext.to_str())
1352                .map(|ext| ext.eq_ignore_ascii_case("wasm"))
1353                .unwrap_or(false);
1354        let archive_hint_path = if let Some(source) = archive_source {
1355            let (_, normalized) = normalize_pack_path(source)?;
1356            Some(normalized)
1357        } else if is_component || is_dir {
1358            None
1359        } else {
1360            Some(safe_path.clone())
1361        };
1362        let archive_hint = archive_hint_path.as_deref();
1363        if verify_archive {
1364            if let Some(verify_target) = archive_hint.and_then(|p| {
1365                std::fs::metadata(p)
1366                    .ok()
1367                    .filter(|meta| meta.is_file())
1368                    .map(|_| p)
1369            }) {
1370                verify::verify_pack(verify_target).await?;
1371                tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
1372            } else {
1373                tracing::debug!("skipping archive verification (no archive source)");
1374            }
1375        }
1376        let engine = Engine::default();
1377        let engine_profile =
1378            EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1379        let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1380        let mut metadata = PackMetadata::fallback(&safe_path);
1381        let mut manifest = None;
1382        let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
1383        let mut flows = None;
1384        let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
1385            if is_dir {
1386                Some(safe_path.clone())
1387            } else {
1388                None
1389            }
1390        });
1391        let (pack_assets_dir, assets_tempdir) =
1392            locate_pack_assets(materialized_root.as_deref(), archive_hint)?;
1393        let setup_yaml_exists = pack_assets_dir
1394            .as_ref()
1395            .map(|dir| dir.join("setup.yaml").is_file())
1396            .unwrap_or(false);
1397        tracing::info!(
1398            pack_root = %safe_path.display(),
1399            assets_setup_yaml_exists = setup_yaml_exists,
1400            "pack unpack metadata"
1401        );
1402
1403        if let Some(root) = materialized_root.as_ref() {
1404            match load_manifest_and_flows_from_dir(root) {
1405                Ok(ManifestLoad::New {
1406                    manifest: m,
1407                    flows: cache,
1408                }) => {
1409                    metadata = cache.metadata.clone();
1410                    manifest = Some(*m);
1411                    flows = Some(cache);
1412                }
1413                Ok(ManifestLoad::Legacy {
1414                    manifest: m,
1415                    flows: cache,
1416                }) => {
1417                    metadata = cache.metadata.clone();
1418                    legacy_manifest = Some(m);
1419                    flows = Some(cache);
1420                }
1421                Err(err) => {
1422                    warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
1423                }
1424            }
1425        }
1426
1427        if manifest.is_none()
1428            && legacy_manifest.is_none()
1429            && let Some(archive_path) = archive_hint
1430        {
1431            let manifest_load = load_manifest_and_flows(archive_path).with_context(|| {
1432                format!(
1433                    "failed to load manifest.cbor from {}",
1434                    archive_path.display()
1435                )
1436            })?;
1437            match manifest_load {
1438                ManifestLoad::New {
1439                    manifest: m,
1440                    flows: cache,
1441                } => {
1442                    metadata = cache.metadata.clone();
1443                    manifest = Some(*m);
1444                    flows = Some(cache);
1445                }
1446                ManifestLoad::Legacy {
1447                    manifest: m,
1448                    flows: cache,
1449                } => {
1450                    metadata = cache.metadata.clone();
1451                    legacy_manifest = Some(m);
1452                    flows = Some(cache);
1453                }
1454            }
1455        }
1456        #[cfg(feature = "fault-injection")]
1457        {
1458            let fault_ctx = FaultContext {
1459                pack_id: metadata.pack_id.as_str(),
1460                flow_id: "unknown",
1461                node_id: None,
1462                attempt: 1,
1463            };
1464            maybe_fail(FaultPoint::PackResolve, fault_ctx)
1465                .map_err(|err| anyhow!(err.to_string()))?;
1466        }
1467        let mut pack_lock = None;
1468        for root in find_pack_lock_roots(&safe_path, is_dir, archive_hint) {
1469            pack_lock = load_pack_lock(&root)?;
1470            if pack_lock.is_some() {
1471                break;
1472            }
1473        }
1474        let component_sources_payload = if pack_lock.is_none() {
1475            if let Some(manifest) = manifest.as_ref() {
1476                manifest
1477                    .get_component_sources_v1()
1478                    .context("invalid component sources extension")?
1479            } else {
1480                None
1481            }
1482        } else {
1483            None
1484        };
1485        let component_sources = if let Some(lock) = pack_lock.as_ref() {
1486            Some(component_sources_table_from_pack_lock(
1487                lock,
1488                component_resolution.allow_missing_hash,
1489            )?)
1490        } else {
1491            component_sources_table(component_sources_payload.as_ref())?
1492        };
1493        let components = if is_component {
1494            let wasm_bytes = fs::read(&safe_path).await?;
1495            metadata = PackMetadata::from_wasm(&wasm_bytes)
1496                .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1497            let name = safe_path
1498                .file_stem()
1499                .map(|s| s.to_string_lossy().to_string())
1500                .unwrap_or_else(|| "component".to_string());
1501            let component = compile_component_with_cache(&cache, &engine, None, wasm_bytes).await?;
1502            let mut map = HashMap::new();
1503            map.insert(
1504                name.clone(),
1505                PackComponent {
1506                    name,
1507                    version: metadata.version.clone(),
1508                    component,
1509                },
1510            );
1511            map
1512        } else {
1513            let specs = component_specs(
1514                manifest.as_ref(),
1515                legacy_manifest.as_deref(),
1516                component_sources_payload.as_ref(),
1517                pack_lock.as_ref(),
1518            );
1519            if specs.is_empty() {
1520                HashMap::new()
1521            } else {
1522                let mut loaded = HashMap::new();
1523                let mut missing: HashSet<String> =
1524                    specs.iter().map(|spec| spec.id.clone()).collect();
1525                let mut searched = Vec::new();
1526
1527                if !component_resolution.overrides.is_empty() {
1528                    load_components_from_overrides(
1529                        &cache,
1530                        &engine,
1531                        &component_resolution.overrides,
1532                        &specs,
1533                        &mut missing,
1534                        &mut loaded,
1535                    )
1536                    .await?;
1537                    searched.push("override map".to_string());
1538                }
1539
1540                if let Some(component_sources) = component_sources.as_ref() {
1541                    load_components_from_sources(
1542                        &cache,
1543                        &engine,
1544                        component_sources,
1545                        &component_resolution,
1546                        &specs,
1547                        &mut missing,
1548                        &mut loaded,
1549                        materialized_root.as_deref(),
1550                        archive_hint,
1551                    )
1552                    .await?;
1553                    searched.push(format!("extension {}", EXT_COMPONENT_SOURCES_V1));
1554                }
1555
1556                if let Some(root) = materialized_root.as_ref() {
1557                    load_components_from_dir(
1558                        &cache,
1559                        &engine,
1560                        root,
1561                        &specs,
1562                        &mut missing,
1563                        &mut loaded,
1564                    )
1565                    .await?;
1566                    searched.push(format!("components dir {}", root.display()));
1567                }
1568
1569                if let Some(archive_path) = archive_hint {
1570                    load_components_from_archive(
1571                        &cache,
1572                        &engine,
1573                        archive_path,
1574                        &specs,
1575                        &mut missing,
1576                        &mut loaded,
1577                    )
1578                    .await?;
1579                    searched.push(format!("archive {}", archive_path.display()));
1580                }
1581
1582                if !missing.is_empty() {
1583                    let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
1584                    let sources = if searched.is_empty() {
1585                        "no component sources".to_string()
1586                    } else {
1587                        searched.join(", ")
1588                    };
1589                    bail!(
1590                        "components missing: {}; looked in {}",
1591                        missing_list,
1592                        sources
1593                    );
1594                }
1595
1596                loaded
1597            }
1598        };
1599        let http_client = Arc::clone(&HTTP_CLIENT);
1600        let mut component_manifests = HashMap::new();
1601        if let Some(manifest) = manifest.as_ref() {
1602            for component in &manifest.components {
1603                component_manifests.insert(component.id.as_str().to_string(), component.clone());
1604            }
1605        }
1606        let mut pack_policy = (*wasi_policy).clone();
1607        if let Some(dir) = pack_assets_dir {
1608            tracing::debug!(path = %dir.display(), "preopening pack assets directory for WASI /assets");
1609            pack_policy =
1610                pack_policy.with_preopen(PreopenSpec::new(dir, "/assets").read_only(true));
1611        }
1612        let wasi_policy = Arc::new(pack_policy);
1613        Ok(Self {
1614            path: safe_path,
1615            archive_path: archive_hint.map(Path::to_path_buf),
1616            config,
1617            engine,
1618            metadata,
1619            manifest,
1620            legacy_manifest,
1621            component_manifests,
1622            mocks,
1623            flows,
1624            components,
1625            http_client,
1626            pre_cache: Mutex::new(HashMap::new()),
1627            session_store,
1628            state_store,
1629            wasi_policy,
1630            assets_tempdir,
1631            provider_registry: RwLock::new(None),
1632            secrets,
1633            oauth_config,
1634            cache,
1635        })
1636    }
1637
1638    pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
1639        if let Some(cache) = &self.flows {
1640            return Ok(cache.descriptors.clone());
1641        }
1642        if let Some(manifest) = &self.manifest {
1643            let descriptors = manifest
1644                .flows
1645                .iter()
1646                .map(|flow| FlowDescriptor {
1647                    id: flow.id.as_str().to_string(),
1648                    flow_type: flow_kind_to_str(flow.kind).to_string(),
1649                    pack_id: manifest.pack_id.as_str().to_string(),
1650                    profile: manifest.pack_id.as_str().to_string(),
1651                    version: manifest.version.to_string(),
1652                    description: None,
1653                })
1654                .collect();
1655            return Ok(descriptors);
1656        }
1657        Ok(Vec::new())
1658    }
1659
1660    #[allow(dead_code)]
1661    pub async fn run_flow(
1662        &self,
1663        flow_id: &str,
1664        input: serde_json::Value,
1665    ) -> Result<serde_json::Value> {
1666        let pack = Arc::new(
1667            PackRuntime::load(
1668                &self.path,
1669                Arc::clone(&self.config),
1670                self.mocks.clone(),
1671                self.archive_path.as_deref(),
1672                self.session_store.clone(),
1673                self.state_store.clone(),
1674                Arc::clone(&self.wasi_policy),
1675                self.secrets.clone(),
1676                self.oauth_config.clone(),
1677                false,
1678                ComponentResolution::default(),
1679            )
1680            .await?,
1681        );
1682
1683        let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1684        let retry_config = self.config.retry_config().into();
1685        let mocks = pack.mocks.as_deref();
1686        let tenant = self.config.tenant.as_str();
1687
1688        let ctx = FlowContext {
1689            tenant,
1690            pack_id: pack.metadata().pack_id.as_str(),
1691            flow_id,
1692            node_id: None,
1693            tool: None,
1694            action: None,
1695            session_id: None,
1696            provider_id: None,
1697            retry_config,
1698            attempt: 1,
1699            observer: None,
1700            mocks,
1701        };
1702
1703        let execution = engine.execute(ctx, input).await?;
1704        match execution.status {
1705            FlowStatus::Completed => Ok(execution.output),
1706            FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1707                "status": "pending",
1708                "reason": wait.reason,
1709                "resume": wait.snapshot,
1710                "response": execution.output,
1711            })),
1712        }
1713    }
1714
1715    pub async fn invoke_component(
1716        &self,
1717        component_ref: &str,
1718        ctx: ComponentExecCtx,
1719        operation: &str,
1720        _config_json: Option<String>,
1721        input_json: String,
1722    ) -> Result<Value> {
1723        let pack_component = self
1724            .components
1725            .get(component_ref)
1726            .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1727        let engine = self.engine.clone();
1728        let config = Arc::clone(&self.config);
1729        let http_client = Arc::clone(&self.http_client);
1730        let mocks = self.mocks.clone();
1731        let session_store = self.session_store.clone();
1732        let state_store = self.state_store.clone();
1733        let secrets = Arc::clone(&self.secrets);
1734        let oauth_config = self.oauth_config.clone();
1735        let wasi_policy = Arc::clone(&self.wasi_policy);
1736        let pack_id = self.metadata().pack_id.clone();
1737        let allow_state_store = self.allows_state_store(component_ref);
1738        let component = pack_component.component.clone();
1739        let component_ref_owned = component_ref.to_string();
1740        let operation_owned = operation.to_string();
1741        let input_owned = input_json;
1742        let ctx_owned = ctx;
1743
1744        run_on_wasi_thread("component.invoke", move || {
1745            let mut linker = Linker::new(&engine);
1746            register_all(&mut linker, allow_state_store)?;
1747            add_component_control_to_linker(&mut linker)?;
1748
1749            let host_state = HostState::new(
1750                pack_id.clone(),
1751                config,
1752                http_client,
1753                mocks,
1754                session_store,
1755                state_store,
1756                secrets,
1757                oauth_config,
1758                Some(ctx_owned.clone()),
1759                Some(component_ref_owned.clone()),
1760                false,
1761            )?;
1762            let store_state = ComponentState::new(host_state, wasi_policy)?;
1763            let mut store = wasmtime::Store::new(&engine, store_state);
1764
1765            let invoke_result = HostState::instantiate_component_result(
1766                &mut linker,
1767                &mut store,
1768                &component,
1769                &ctx_owned,
1770                &operation_owned,
1771                &input_owned,
1772            )?;
1773            HostState::convert_invoke_result(invoke_result)
1774        })
1775    }
1776
1777    pub fn resolve_provider(
1778        &self,
1779        provider_id: Option<&str>,
1780        provider_type: Option<&str>,
1781    ) -> Result<ProviderBinding> {
1782        let registry = self.provider_registry()?;
1783        registry.resolve(provider_id, provider_type)
1784    }
1785
1786    pub async fn invoke_provider(
1787        &self,
1788        binding: &ProviderBinding,
1789        ctx: ComponentExecCtx,
1790        op: &str,
1791        input_json: Vec<u8>,
1792    ) -> Result<Value> {
1793        let component_ref_owned = binding.component_ref.clone();
1794        let pack_component = self.components.get(&component_ref_owned).with_context(|| {
1795            format!("provider component '{component_ref_owned}' not found in pack")
1796        })?;
1797        let component = pack_component.component.clone();
1798
1799        let engine = self.engine.clone();
1800        let config = Arc::clone(&self.config);
1801        let http_client = Arc::clone(&self.http_client);
1802        let mocks = self.mocks.clone();
1803        let session_store = self.session_store.clone();
1804        let state_store = self.state_store.clone();
1805        let secrets = Arc::clone(&self.secrets);
1806        let oauth_config = self.oauth_config.clone();
1807        let wasi_policy = Arc::clone(&self.wasi_policy);
1808        let pack_id = self.metadata().pack_id.clone();
1809        let allow_state_store = self.allows_state_store(&component_ref_owned);
1810        let input_owned = input_json;
1811        let op_owned = op.to_string();
1812        let ctx_owned = ctx;
1813        let world = binding.world.clone();
1814
1815        run_on_wasi_thread("provider.invoke", move || {
1816            let mut linker = Linker::new(&engine);
1817            register_all(&mut linker, allow_state_store)?;
1818            add_component_control_to_linker(&mut linker)?;
1819            let mut pre_instance = Some(linker.instantiate_pre(component.as_ref())?);
1820            let host_state = HostState::new(
1821                pack_id.clone(),
1822                config,
1823                http_client,
1824                mocks,
1825                session_store,
1826                state_store,
1827                secrets,
1828                oauth_config,
1829                Some(ctx_owned.clone()),
1830                Some(component_ref_owned.clone()),
1831                true,
1832            )?;
1833            let store_state = ComponentState::new(host_state, wasi_policy)?;
1834            let mut store = wasmtime::Store::new(&engine, store_state);
1835            let use_schema_core =
1836                world.contains("provider-schema-core") || world.contains("provider/schema-core");
1837            let result = if use_schema_core {
1838                let pre_instance = pre_instance
1839                    .take()
1840                    .ok_or_else(|| anyhow!("provider pre_instance already consumed"))?;
1841                let pre: SchemaSchemaCorePre<ComponentState> =
1842                    SchemaSchemaCorePre::new(pre_instance)?;
1843                let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
1844                let provider = bindings.greentic_provider_schema_core_schema_core_api();
1845                provider.call_invoke(&mut store, &op_owned, &input_owned)?
1846            } else {
1847                let pre_instance = pre_instance
1848                    .take()
1849                    .ok_or_else(|| anyhow!("provider pre_instance already consumed"))?;
1850                let pre: LegacySchemaCorePre<ComponentState> =
1851                    LegacySchemaCorePre::new(pre_instance)?;
1852                let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
1853                let provider = bindings.greentic_provider_core_schema_core_api();
1854                provider.call_invoke(&mut store, &op_owned, &input_owned)?
1855            };
1856            deserialize_json_bytes(result)
1857        })
1858    }
1859
1860    pub(crate) fn provider_registry(&self) -> Result<ProviderRegistry> {
1861        if let Some(registry) = self.provider_registry.read().clone() {
1862            return Ok(registry);
1863        }
1864        let manifest = self
1865            .manifest
1866            .as_ref()
1867            .context("pack manifest required for provider resolution")?;
1868        let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1869        let registry = ProviderRegistry::new(
1870            manifest,
1871            self.state_store.clone(),
1872            &self.config.tenant,
1873            &env,
1874        )?;
1875        *self.provider_registry.write() = Some(registry.clone());
1876        Ok(registry)
1877    }
1878
1879    pub(crate) fn provider_registry_optional(&self) -> Result<Option<ProviderRegistry>> {
1880        if self.manifest.is_none() {
1881            return Ok(None);
1882        }
1883        Ok(Some(self.provider_registry()?))
1884    }
1885
1886    pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1887        if let Some(cache) = &self.flows {
1888            return cache
1889                .flows
1890                .get(flow_id)
1891                .cloned()
1892                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1893        }
1894        if let Some(manifest) = &self.manifest {
1895            let entry = manifest
1896                .flows
1897                .iter()
1898                .find(|f| f.id.as_str() == flow_id)
1899                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1900            return Ok(entry.flow.clone());
1901        }
1902        bail!("flow '{flow_id}' not available (pack exports disabled)")
1903    }
1904
1905    pub fn metadata(&self) -> &PackMetadata {
1906        &self.metadata
1907    }
1908
1909    pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1910        &self.metadata.secret_requirements
1911    }
1912
1913    pub fn missing_secrets(
1914        &self,
1915        tenant_ctx: &TypesTenantCtx,
1916    ) -> Vec<greentic_types::SecretRequirement> {
1917        let env = tenant_ctx.env.as_str().to_string();
1918        let tenant = tenant_ctx.tenant.as_str().to_string();
1919        let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1920        self.required_secrets()
1921            .iter()
1922            .filter(|req| {
1923                // scope must match current context if provided
1924                if let Some(scope) = &req.scope {
1925                    if scope.env != env {
1926                        return false;
1927                    }
1928                    if scope.tenant != tenant {
1929                        return false;
1930                    }
1931                    if let Some(ref team_req) = scope.team
1932                        && team.as_ref() != Some(team_req)
1933                    {
1934                        return false;
1935                    }
1936                }
1937                let ctx = self.config.tenant_ctx();
1938                read_secret_blocking(&self.secrets, &ctx, req.key.as_str()).is_err()
1939            })
1940            .cloned()
1941            .collect()
1942    }
1943
1944    pub fn for_component_test(
1945        components: Vec<(String, PathBuf)>,
1946        flows: HashMap<String, FlowIR>,
1947        pack_id: &str,
1948        config: Arc<HostConfig>,
1949    ) -> Result<Self> {
1950        let engine = Engine::default();
1951        let engine_profile =
1952            EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1953        let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1954        let mut component_map = HashMap::new();
1955        for (name, path) in components {
1956            if !path.exists() {
1957                bail!("component artifact missing: {}", path.display());
1958            }
1959            let wasm_bytes = std::fs::read(&path)?;
1960            let component = Arc::new(
1961                Component::from_binary(&engine, &wasm_bytes)
1962                    .with_context(|| format!("failed to compile component {}", path.display()))?,
1963            );
1964            component_map.insert(
1965                name.clone(),
1966                PackComponent {
1967                    name,
1968                    version: "0.0.0".into(),
1969                    component,
1970                },
1971            );
1972        }
1973
1974        let mut flow_map = HashMap::new();
1975        let mut descriptors = Vec::new();
1976        for (id, ir) in flows {
1977            let flow_type = ir.flow_type.clone();
1978            let flow = flow_ir_to_flow(ir)?;
1979            flow_map.insert(id.clone(), flow);
1980            descriptors.push(FlowDescriptor {
1981                id: id.clone(),
1982                flow_type,
1983                pack_id: pack_id.to_string(),
1984                profile: "test".into(),
1985                version: "0.0.0".into(),
1986                description: None,
1987            });
1988        }
1989        let entry_flows = descriptors.iter().map(|flow| flow.id.clone()).collect();
1990        let metadata = PackMetadata {
1991            pack_id: pack_id.to_string(),
1992            version: "0.0.0".into(),
1993            entry_flows,
1994            secret_requirements: Vec::new(),
1995        };
1996        let flows_cache = PackFlows {
1997            descriptors: descriptors.clone(),
1998            flows: flow_map,
1999            metadata: metadata.clone(),
2000        };
2001
2002        Ok(Self {
2003            path: PathBuf::new(),
2004            archive_path: None,
2005            config,
2006            engine,
2007            metadata,
2008            manifest: None,
2009            legacy_manifest: None,
2010            component_manifests: HashMap::new(),
2011            mocks: None,
2012            flows: Some(flows_cache),
2013            components: component_map,
2014            http_client: Arc::clone(&HTTP_CLIENT),
2015            pre_cache: Mutex::new(HashMap::new()),
2016            session_store: None,
2017            state_store: None,
2018            wasi_policy: Arc::new(RunnerWasiPolicy::new()),
2019            assets_tempdir: None,
2020            provider_registry: RwLock::new(None),
2021            secrets: crate::secrets::default_manager()?,
2022            oauth_config: None,
2023            cache,
2024        })
2025    }
2026}
2027
2028fn is_missing_node_export(err: &wasmtime::Error, version: &str) -> bool {
2029    let message = err.to_string();
2030    message.contains("no exported instance named")
2031        && message.contains(&format!("greentic:component/node@{version}"))
2032}
2033
2034struct PackFlows {
2035    descriptors: Vec<FlowDescriptor>,
2036    flows: HashMap<String, Flow>,
2037    metadata: PackMetadata,
2038}
2039
2040const RUNTIME_FLOW_EXTENSION_IDS: [&str; 3] = [
2041    "greentic.pack.runtime_flow",
2042    "greentic.pack.flow_runtime",
2043    "greentic.pack.runtime_flows",
2044];
2045
2046#[derive(Debug, Deserialize)]
2047struct RuntimeFlowBundle {
2048    flows: Vec<RuntimeFlow>,
2049}
2050
2051#[derive(Debug, Deserialize)]
2052struct RuntimeFlow {
2053    id: String,
2054    #[serde(alias = "flow_type")]
2055    kind: FlowKind,
2056    #[serde(default)]
2057    schema_version: Option<String>,
2058    #[serde(default)]
2059    start: Option<String>,
2060    #[serde(default)]
2061    entrypoints: BTreeMap<String, Value>,
2062    nodes: BTreeMap<String, RuntimeNode>,
2063    #[serde(default)]
2064    metadata: Option<FlowMetadata>,
2065}
2066
2067#[derive(Debug, Deserialize)]
2068struct RuntimeNode {
2069    #[serde(alias = "component")]
2070    component_id: String,
2071    #[serde(default, alias = "operation")]
2072    operation_name: Option<String>,
2073    #[serde(default, alias = "payload", alias = "input")]
2074    operation_payload: Value,
2075    #[serde(default)]
2076    routing: Option<Routing>,
2077    #[serde(default)]
2078    telemetry: Option<TelemetryHints>,
2079}
2080
2081fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
2082    if bytes.is_empty() {
2083        return Ok(Value::Null);
2084    }
2085    serde_json::from_slice(&bytes).or_else(|_| {
2086        String::from_utf8(bytes)
2087            .map(Value::String)
2088            .map_err(|err| anyhow!(err))
2089    })
2090}
2091
2092impl PackFlows {
2093    fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
2094        if let Some(flows) = flows_from_runtime_extension(&manifest) {
2095            return flows;
2096        }
2097        let descriptors = manifest
2098            .flows
2099            .iter()
2100            .map(|entry| FlowDescriptor {
2101                id: entry.id.as_str().to_string(),
2102                flow_type: flow_kind_to_str(entry.kind).to_string(),
2103                pack_id: manifest.pack_id.as_str().to_string(),
2104                profile: manifest.pack_id.as_str().to_string(),
2105                version: manifest.version.to_string(),
2106                description: None,
2107            })
2108            .collect();
2109        let mut flows = HashMap::new();
2110        for entry in &manifest.flows {
2111            flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
2112        }
2113        Self {
2114            metadata: PackMetadata::from_manifest(&manifest),
2115            descriptors,
2116            flows,
2117        }
2118    }
2119}
2120
2121fn flows_from_runtime_extension(manifest: &greentic_types::PackManifest) -> Option<PackFlows> {
2122    let extensions = manifest.extensions.as_ref()?;
2123    let extension = extensions.iter().find_map(|(key, ext)| {
2124        if RUNTIME_FLOW_EXTENSION_IDS
2125            .iter()
2126            .any(|candidate| candidate == key)
2127        {
2128            Some(ext)
2129        } else {
2130            None
2131        }
2132    })?;
2133    let runtime_flows = match decode_runtime_flow_extension(extension) {
2134        Some(flows) if !flows.is_empty() => flows,
2135        _ => return None,
2136    };
2137
2138    let descriptors = runtime_flows
2139        .iter()
2140        .map(|flow| FlowDescriptor {
2141            id: flow.id.as_str().to_string(),
2142            flow_type: flow_kind_to_str(flow.kind).to_string(),
2143            pack_id: manifest.pack_id.as_str().to_string(),
2144            profile: manifest.pack_id.as_str().to_string(),
2145            version: manifest.version.to_string(),
2146            description: None,
2147        })
2148        .collect::<Vec<_>>();
2149    let flows = runtime_flows
2150        .into_iter()
2151        .map(|flow| (flow.id.as_str().to_string(), flow))
2152        .collect();
2153
2154    Some(PackFlows {
2155        metadata: PackMetadata::from_manifest(manifest),
2156        descriptors,
2157        flows,
2158    })
2159}
2160
2161fn decode_runtime_flow_extension(extension: &ExtensionRef) -> Option<Vec<Flow>> {
2162    let value = match extension.inline.as_ref()? {
2163        ExtensionInline::Other(value) => value.clone(),
2164        _ => return None,
2165    };
2166
2167    if let Ok(bundle) = serde_json::from_value::<RuntimeFlowBundle>(value.clone()) {
2168        return Some(collect_runtime_flows(bundle.flows));
2169    }
2170
2171    if let Ok(flows) = serde_json::from_value::<Vec<RuntimeFlow>>(value.clone()) {
2172        return Some(collect_runtime_flows(flows));
2173    }
2174
2175    if let Ok(flows) = serde_json::from_value::<Vec<Flow>>(value) {
2176        return Some(flows);
2177    }
2178
2179    warn!(
2180        extension = %extension.kind,
2181        version = %extension.version,
2182        "runtime flow extension present but could not be decoded"
2183    );
2184    None
2185}
2186
2187fn collect_runtime_flows(flows: Vec<RuntimeFlow>) -> Vec<Flow> {
2188    flows
2189        .into_iter()
2190        .filter_map(|flow| match runtime_flow_to_flow(flow) {
2191            Ok(flow) => Some(flow),
2192            Err(err) => {
2193                warn!(error = %err, "failed to decode runtime flow");
2194                None
2195            }
2196        })
2197        .collect()
2198}
2199
2200fn runtime_flow_to_flow(runtime: RuntimeFlow) -> Result<Flow> {
2201    let flow_id = FlowId::from_str(&runtime.id)
2202        .with_context(|| format!("invalid flow id `{}`", runtime.id))?;
2203    let mut entrypoints = runtime.entrypoints;
2204    if entrypoints.is_empty()
2205        && let Some(start) = &runtime.start
2206    {
2207        entrypoints.insert("default".into(), Value::String(start.clone()));
2208    }
2209
2210    let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
2211    for (id, node) in runtime.nodes {
2212        let node_id = NodeId::from_str(&id).with_context(|| format!("invalid node id `{id}`"))?;
2213        let component_id = ComponentId::from_str(&node.component_id)
2214            .with_context(|| format!("invalid component id `{}`", node.component_id))?;
2215        let component = FlowComponentRef {
2216            id: component_id,
2217            pack_alias: None,
2218            operation: node.operation_name,
2219        };
2220        let routing = node.routing.unwrap_or(Routing::End);
2221        let telemetry = node.telemetry.unwrap_or_default();
2222        nodes.insert(
2223            node_id.clone(),
2224            Node {
2225                id: node_id,
2226                component,
2227                input: InputMapping {
2228                    mapping: node.operation_payload,
2229                },
2230                output: OutputMapping {
2231                    mapping: Value::Null,
2232                },
2233                routing,
2234                telemetry,
2235            },
2236        );
2237    }
2238
2239    Ok(Flow {
2240        schema_version: runtime.schema_version.unwrap_or_else(|| "1.0".to_string()),
2241        id: flow_id,
2242        kind: runtime.kind,
2243        entrypoints,
2244        nodes,
2245        metadata: runtime.metadata.unwrap_or_default(),
2246    })
2247}
2248
2249fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
2250    match kind {
2251        greentic_types::FlowKind::Messaging => "messaging",
2252        greentic_types::FlowKind::Event => "event",
2253        greentic_types::FlowKind::ComponentConfig => "component-config",
2254        greentic_types::FlowKind::Job => "job",
2255        greentic_types::FlowKind::Http => "http",
2256    }
2257}
2258
2259fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
2260    let mut file = archive
2261        .by_name(name)
2262        .with_context(|| format!("entry {name} missing from archive"))?;
2263    let mut buf = Vec::new();
2264    file.read_to_end(&mut buf)?;
2265    Ok(buf)
2266}
2267
2268fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
2269    for node in doc.nodes.values_mut() {
2270        let Some((component_ref, payload)) = node
2271            .raw
2272            .iter()
2273            .next()
2274            .map(|(key, value)| (key.clone(), value.clone()))
2275        else {
2276            continue;
2277        };
2278        if component_ref.starts_with("emit.") {
2279            node.operation = Some(component_ref);
2280            node.payload = payload;
2281            node.raw.clear();
2282            continue;
2283        }
2284        let (target_component, operation, input, config) =
2285            infer_component_exec(&payload, &component_ref);
2286        let mut payload_obj = serde_json::Map::new();
2287        // component.exec is meta; ensure the payload carries the actual target component.
2288        payload_obj.insert("component".into(), Value::String(target_component));
2289        payload_obj.insert("operation".into(), Value::String(operation));
2290        payload_obj.insert("input".into(), input);
2291        if let Some(cfg) = config {
2292            payload_obj.insert("config".into(), cfg);
2293        }
2294        node.operation = Some("component.exec".to_string());
2295        node.payload = Value::Object(payload_obj);
2296        node.raw.clear();
2297    }
2298    doc
2299}
2300
2301fn infer_component_exec(
2302    payload: &Value,
2303    component_ref: &str,
2304) -> (String, String, Value, Option<Value>) {
2305    let default_op = if component_ref.starts_with("templating.") {
2306        "render"
2307    } else {
2308        "invoke"
2309    }
2310    .to_string();
2311
2312    if let Value::Object(map) = payload {
2313        let op = map
2314            .get("op")
2315            .or_else(|| map.get("operation"))
2316            .and_then(Value::as_str)
2317            .map(|s| s.to_string())
2318            .unwrap_or_else(|| default_op.clone());
2319
2320        let mut input = map.clone();
2321        let config = input.remove("config");
2322        let component = input
2323            .get("component")
2324            .or_else(|| input.get("component_ref"))
2325            .and_then(Value::as_str)
2326            .map(|s| s.to_string())
2327            .unwrap_or_else(|| component_ref.to_string());
2328        input.remove("component");
2329        input.remove("component_ref");
2330        input.remove("op");
2331        input.remove("operation");
2332        return (component, op, Value::Object(input), config);
2333    }
2334
2335    (component_ref.to_string(), default_op, payload.clone(), None)
2336}
2337
2338#[derive(Clone, Debug)]
2339struct ComponentSpec {
2340    id: String,
2341    version: String,
2342    legacy_path: Option<String>,
2343}
2344
2345#[derive(Clone, Debug)]
2346struct ComponentSourceInfo {
2347    digest: Option<String>,
2348    source: ComponentSourceRef,
2349    artifact: ComponentArtifactLocation,
2350    expected_wasm_sha256: Option<String>,
2351    skip_digest_verification: bool,
2352}
2353
2354#[derive(Clone, Debug)]
2355enum ComponentArtifactLocation {
2356    Inline { wasm_path: String },
2357    Remote,
2358}
2359
2360#[derive(Clone, Debug, Deserialize)]
2361struct PackLockV1 {
2362    schema_version: u32,
2363    components: Vec<PackLockComponent>,
2364}
2365
2366#[derive(Clone, Debug, Deserialize)]
2367struct PackLockComponent {
2368    name: String,
2369    #[serde(default, rename = "source_ref")]
2370    source_ref: Option<String>,
2371    #[serde(default, rename = "ref")]
2372    legacy_ref: Option<String>,
2373    #[serde(default)]
2374    component_id: Option<ComponentId>,
2375    #[serde(default)]
2376    bundled: Option<bool>,
2377    #[serde(default, rename = "bundled_path")]
2378    bundled_path: Option<String>,
2379    #[serde(default, rename = "path")]
2380    legacy_path: Option<String>,
2381    #[serde(default)]
2382    wasm_sha256: Option<String>,
2383    #[serde(default, rename = "sha256")]
2384    legacy_sha256: Option<String>,
2385    #[serde(default)]
2386    resolved_digest: Option<String>,
2387    #[serde(default)]
2388    digest: Option<String>,
2389}
2390
2391fn component_specs(
2392    manifest: Option<&greentic_types::PackManifest>,
2393    legacy_manifest: Option<&legacy_pack::PackManifest>,
2394    component_sources: Option<&ComponentSourcesV1>,
2395    pack_lock: Option<&PackLockV1>,
2396) -> Vec<ComponentSpec> {
2397    if let Some(manifest) = manifest {
2398        if !manifest.components.is_empty() {
2399            return manifest
2400                .components
2401                .iter()
2402                .map(|entry| ComponentSpec {
2403                    id: entry.id.as_str().to_string(),
2404                    version: entry.version.to_string(),
2405                    legacy_path: None,
2406                })
2407                .collect();
2408        }
2409        if let Some(lock) = pack_lock {
2410            let mut seen = HashSet::new();
2411            let mut specs = Vec::new();
2412            for entry in &lock.components {
2413                let id = entry
2414                    .component_id
2415                    .as_ref()
2416                    .map(|id| id.as_str())
2417                    .unwrap_or(entry.name.as_str());
2418                if seen.insert(id.to_string()) {
2419                    specs.push(ComponentSpec {
2420                        id: id.to_string(),
2421                        version: "0.0.0".to_string(),
2422                        legacy_path: None,
2423                    });
2424                }
2425            }
2426            return specs;
2427        }
2428        if let Some(sources) = component_sources {
2429            let mut seen = HashSet::new();
2430            let mut specs = Vec::new();
2431            for entry in &sources.components {
2432                let id = entry
2433                    .component_id
2434                    .as_ref()
2435                    .map(|id| id.as_str())
2436                    .unwrap_or(entry.name.as_str());
2437                if seen.insert(id.to_string()) {
2438                    specs.push(ComponentSpec {
2439                        id: id.to_string(),
2440                        version: "0.0.0".to_string(),
2441                        legacy_path: None,
2442                    });
2443                }
2444            }
2445            return specs;
2446        }
2447    }
2448    if let Some(legacy_manifest) = legacy_manifest {
2449        return legacy_manifest
2450            .components
2451            .iter()
2452            .map(|entry| ComponentSpec {
2453                id: entry.name.clone(),
2454                version: entry.version.to_string(),
2455                legacy_path: Some(entry.file_wasm.clone()),
2456            })
2457            .collect();
2458    }
2459    Vec::new()
2460}
2461
2462fn component_sources_table(
2463    sources: Option<&ComponentSourcesV1>,
2464) -> Result<Option<HashMap<String, ComponentSourceInfo>>> {
2465    let Some(sources) = sources else {
2466        return Ok(None);
2467    };
2468    let mut table = HashMap::new();
2469    for entry in &sources.components {
2470        let artifact = match &entry.artifact {
2471            ArtifactLocationV1::Inline { wasm_path, .. } => ComponentArtifactLocation::Inline {
2472                wasm_path: wasm_path.clone(),
2473            },
2474            ArtifactLocationV1::Remote => ComponentArtifactLocation::Remote,
2475        };
2476        let info = ComponentSourceInfo {
2477            digest: Some(entry.resolved.digest.clone()),
2478            source: entry.source.clone(),
2479            artifact,
2480            expected_wasm_sha256: None,
2481            skip_digest_verification: false,
2482        };
2483        if let Some(component_id) = entry.component_id.as_ref() {
2484            table.insert(component_id.as_str().to_string(), info.clone());
2485        }
2486        table.insert(entry.name.clone(), info);
2487    }
2488    Ok(Some(table))
2489}
2490
2491fn load_pack_lock(path: &Path) -> Result<Option<PackLockV1>> {
2492    let lock_path = if path.is_dir() {
2493        let candidate = path.join("pack.lock");
2494        if candidate.exists() {
2495            Some(candidate)
2496        } else {
2497            let candidate = path.join("pack.lock.json");
2498            candidate.exists().then_some(candidate)
2499        }
2500    } else {
2501        None
2502    };
2503    let Some(lock_path) = lock_path else {
2504        return Ok(None);
2505    };
2506    let raw = std::fs::read_to_string(&lock_path)
2507        .with_context(|| format!("failed to read {}", lock_path.display()))?;
2508    let lock: PackLockV1 = serde_json::from_str(&raw).context("failed to parse pack.lock")?;
2509    if lock.schema_version != 1 {
2510        bail!("pack.lock schema_version must be 1");
2511    }
2512    Ok(Some(lock))
2513}
2514
2515fn find_pack_lock_roots(
2516    pack_path: &Path,
2517    is_dir: bool,
2518    archive_hint: Option<&Path>,
2519) -> Vec<PathBuf> {
2520    if is_dir {
2521        return vec![pack_path.to_path_buf()];
2522    }
2523    let mut roots = Vec::new();
2524    if let Some(archive_path) = archive_hint {
2525        if let Some(parent) = archive_path.parent() {
2526            roots.push(parent.to_path_buf());
2527            if let Some(grandparent) = parent.parent() {
2528                roots.push(grandparent.to_path_buf());
2529            }
2530        }
2531    } else if let Some(parent) = pack_path.parent() {
2532        roots.push(parent.to_path_buf());
2533        if let Some(grandparent) = parent.parent() {
2534            roots.push(grandparent.to_path_buf());
2535        }
2536    }
2537    roots
2538}
2539
2540fn normalize_sha256(digest: &str) -> Result<String> {
2541    let trimmed = digest.trim();
2542    if trimmed.is_empty() {
2543        bail!("sha256 digest cannot be empty");
2544    }
2545    if let Some(stripped) = trimmed.strip_prefix("sha256:") {
2546        if stripped.is_empty() {
2547            bail!("sha256 digest must include hex bytes after sha256:");
2548        }
2549        return Ok(trimmed.to_string());
2550    }
2551    if trimmed.chars().all(|c| c.is_ascii_hexdigit()) {
2552        return Ok(format!("sha256:{trimmed}"));
2553    }
2554    bail!("sha256 digest must be hex or sha256:<hex>");
2555}
2556
2557fn component_sources_table_from_pack_lock(
2558    lock: &PackLockV1,
2559    allow_missing_hash: bool,
2560) -> Result<HashMap<String, ComponentSourceInfo>> {
2561    let mut table = HashMap::new();
2562    let mut names = HashSet::new();
2563    for entry in &lock.components {
2564        if !names.insert(entry.name.clone()) {
2565            bail!(
2566                "pack.lock contains duplicate component name `{}`",
2567                entry.name
2568            );
2569        }
2570        let source_ref = match (&entry.source_ref, &entry.legacy_ref) {
2571            (Some(primary), Some(legacy)) => {
2572                if primary != legacy {
2573                    bail!(
2574                        "pack.lock component {} has conflicting refs: {} vs {}",
2575                        entry.name,
2576                        primary,
2577                        legacy
2578                    );
2579                }
2580                primary.as_str()
2581            }
2582            (Some(primary), None) => primary.as_str(),
2583            (None, Some(legacy)) => legacy.as_str(),
2584            (None, None) => {
2585                bail!("pack.lock component {} missing source_ref", entry.name);
2586            }
2587        };
2588        let source: ComponentSourceRef = source_ref
2589            .parse()
2590            .with_context(|| format!("invalid component ref `{}`", source_ref))?;
2591        let bundled_path = match (&entry.bundled_path, &entry.legacy_path) {
2592            (Some(primary), Some(legacy)) => {
2593                if primary != legacy {
2594                    bail!(
2595                        "pack.lock component {} has conflicting bundled paths: {} vs {}",
2596                        entry.name,
2597                        primary,
2598                        legacy
2599                    );
2600                }
2601                Some(primary.clone())
2602            }
2603            (Some(primary), None) => Some(primary.clone()),
2604            (None, Some(legacy)) => Some(legacy.clone()),
2605            (None, None) => None,
2606        };
2607        let bundled = entry.bundled.unwrap_or(false) || bundled_path.is_some();
2608        let (artifact, digest, expected_wasm_sha256, skip_digest_verification) = if bundled {
2609            let wasm_path = bundled_path.ok_or_else(|| {
2610                anyhow!(
2611                    "pack.lock component {} marked bundled but bundled_path is missing",
2612                    entry.name
2613                )
2614            })?;
2615            let expected_raw = match (&entry.wasm_sha256, &entry.legacy_sha256) {
2616                (Some(primary), Some(legacy)) => {
2617                    if primary != legacy {
2618                        bail!(
2619                            "pack.lock component {} has conflicting wasm_sha256 values: {} vs {}",
2620                            entry.name,
2621                            primary,
2622                            legacy
2623                        );
2624                    }
2625                    Some(primary.as_str())
2626                }
2627                (Some(primary), None) => Some(primary.as_str()),
2628                (None, Some(legacy)) => Some(legacy.as_str()),
2629                (None, None) => None,
2630            };
2631            let expected = match expected_raw {
2632                Some(value) => Some(normalize_sha256(value)?),
2633                None => None,
2634            };
2635            if expected.is_none() && !allow_missing_hash {
2636                bail!(
2637                    "pack.lock component {} missing wasm_sha256 for bundled component",
2638                    entry.name
2639                );
2640            }
2641            (
2642                ComponentArtifactLocation::Inline { wasm_path },
2643                expected.clone(),
2644                expected,
2645                allow_missing_hash && expected_raw.is_none(),
2646            )
2647        } else {
2648            if source.is_tag() {
2649                bail!(
2650                    "component {} uses tag ref {} but is not bundled; rebuild the pack",
2651                    entry.name,
2652                    source
2653                );
2654            }
2655            let expected = entry
2656                .resolved_digest
2657                .as_deref()
2658                .or(entry.digest.as_deref())
2659                .ok_or_else(|| {
2660                    anyhow!(
2661                        "pack.lock component {} missing resolved_digest for remote component",
2662                        entry.name
2663                    )
2664                })?;
2665            (
2666                ComponentArtifactLocation::Remote,
2667                Some(normalize_digest(expected)),
2668                None,
2669                false,
2670            )
2671        };
2672        let info = ComponentSourceInfo {
2673            digest,
2674            source,
2675            artifact,
2676            expected_wasm_sha256,
2677            skip_digest_verification,
2678        };
2679        if let Some(component_id) = entry.component_id.as_ref() {
2680            let key = component_id.as_str().to_string();
2681            if table.contains_key(&key) {
2682                bail!(
2683                    "pack.lock contains duplicate component id `{}`",
2684                    component_id.as_str()
2685                );
2686            }
2687            table.insert(key, info.clone());
2688        }
2689        if entry.name
2690            != entry
2691                .component_id
2692                .as_ref()
2693                .map(|id| id.as_str())
2694                .unwrap_or("")
2695        {
2696            table.insert(entry.name.clone(), info);
2697        }
2698    }
2699    Ok(table)
2700}
2701
2702fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
2703    if let Some(path) = &spec.legacy_path {
2704        return root.join(path);
2705    }
2706    root.join("components").join(format!("{}.wasm", spec.id))
2707}
2708
2709fn normalize_digest(digest: &str) -> String {
2710    if digest.starts_with("sha256:") || digest.starts_with("blake3:") {
2711        digest.to_string()
2712    } else {
2713        format!("sha256:{digest}")
2714    }
2715}
2716
2717fn compute_digest_for(bytes: &[u8], digest: &str) -> Result<String> {
2718    if digest.starts_with("blake3:") {
2719        let hash = blake3::hash(bytes);
2720        return Ok(format!("blake3:{}", hash.to_hex()));
2721    }
2722    let mut hasher = sha2::Sha256::new();
2723    hasher.update(bytes);
2724    Ok(format!("sha256:{:x}", hasher.finalize()))
2725}
2726
2727fn compute_sha256_digest_for(bytes: &[u8]) -> String {
2728    let mut hasher = sha2::Sha256::new();
2729    hasher.update(bytes);
2730    format!("sha256:{:x}", hasher.finalize())
2731}
2732
2733fn build_artifact_key(cache: &CacheManager, digest: Option<&str>, bytes: &[u8]) -> ArtifactKey {
2734    let wasm_digest = digest
2735        .map(normalize_digest)
2736        .unwrap_or_else(|| compute_sha256_digest_for(bytes));
2737    ArtifactKey::new(cache.engine_profile_id().to_string(), wasm_digest)
2738}
2739
2740async fn compile_component_with_cache(
2741    cache: &CacheManager,
2742    engine: &Engine,
2743    digest: Option<&str>,
2744    bytes: Vec<u8>,
2745) -> Result<Arc<Component>> {
2746    let key = build_artifact_key(cache, digest, &bytes);
2747    cache.get_component(engine, &key, || Ok(bytes)).await
2748}
2749
2750fn verify_component_digest(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2751    let normalized_expected = normalize_digest(expected);
2752    let actual = compute_digest_for(bytes, &normalized_expected)?;
2753    if normalize_digest(&actual) != normalized_expected {
2754        bail!(
2755            "component {component_id} digest mismatch: expected {normalized_expected}, got {actual}"
2756        );
2757    }
2758    Ok(())
2759}
2760
2761fn verify_wasm_sha256(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2762    let normalized_expected = normalize_sha256(expected)?;
2763    let actual = compute_sha256_digest_for(bytes);
2764    if actual != normalized_expected {
2765        bail!(
2766            "component {component_id} bundled digest mismatch: expected {normalized_expected}, got {actual}"
2767        );
2768    }
2769    Ok(())
2770}
2771
2772#[cfg(test)]
2773mod pack_lock_tests {
2774    use super::*;
2775    use tempfile::TempDir;
2776
2777    #[test]
2778    fn pack_lock_tag_ref_requires_bundle() {
2779        let lock = PackLockV1 {
2780            schema_version: 1,
2781            components: vec![PackLockComponent {
2782                name: "templates".to_string(),
2783                source_ref: Some("oci://registry.test/templates:latest".to_string()),
2784                legacy_ref: None,
2785                component_id: None,
2786                bundled: Some(false),
2787                bundled_path: None,
2788                legacy_path: None,
2789                wasm_sha256: None,
2790                legacy_sha256: None,
2791                resolved_digest: None,
2792                digest: None,
2793            }],
2794        };
2795        let err = component_sources_table_from_pack_lock(&lock, false).unwrap_err();
2796        assert!(
2797            err.to_string().contains("tag ref") && err.to_string().contains("rebuild the pack"),
2798            "unexpected error: {err}"
2799        );
2800    }
2801
2802    #[test]
2803    fn bundled_hash_mismatch_errors() {
2804        let rt = tokio::runtime::Runtime::new().expect("runtime");
2805        let temp = TempDir::new().expect("temp dir");
2806        let engine = Engine::default();
2807        let engine_profile =
2808            EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
2809        let cache_config = CacheConfig {
2810            root: temp.path().join("cache"),
2811            ..CacheConfig::default()
2812        };
2813        let cache = CacheManager::new(cache_config, engine_profile);
2814        let wasm_path = temp.path().join("component.wasm");
2815        let fixture_wasm = Path::new(env!("CARGO_MANIFEST_DIR"))
2816            .join("../../tests/fixtures/packs/secrets_store_smoke/components/echo_secret.wasm");
2817        let bytes = std::fs::read(&fixture_wasm).expect("read fixture wasm");
2818        std::fs::write(&wasm_path, &bytes).expect("write temp wasm");
2819
2820        let spec = ComponentSpec {
2821            id: "qa.process".to_string(),
2822            version: "0.0.0".to_string(),
2823            legacy_path: None,
2824        };
2825        let mut missing = HashSet::new();
2826        missing.insert(spec.id.clone());
2827
2828        let mut sources = HashMap::new();
2829        sources.insert(
2830            spec.id.clone(),
2831            ComponentSourceInfo {
2832                digest: Some("sha256:deadbeef".to_string()),
2833                source: ComponentSourceRef::Oci("registry.test/qa.process@sha256:deadbeef".into()),
2834                artifact: ComponentArtifactLocation::Inline {
2835                    wasm_path: "component.wasm".to_string(),
2836                },
2837                expected_wasm_sha256: Some("sha256:deadbeef".to_string()),
2838                skip_digest_verification: false,
2839            },
2840        );
2841
2842        let mut loaded = HashMap::new();
2843        let result = rt.block_on(load_components_from_sources(
2844            &cache,
2845            &engine,
2846            &sources,
2847            &ComponentResolution::default(),
2848            &[spec],
2849            &mut missing,
2850            &mut loaded,
2851            Some(temp.path()),
2852            None,
2853        ));
2854        let err = result.unwrap_err();
2855        assert!(
2856            err.to_string().contains("bundled digest mismatch"),
2857            "unexpected error: {err}"
2858        );
2859    }
2860}
2861
2862#[cfg(test)]
2863mod pack_resolution_prop_tests {
2864    use super::*;
2865    use greentic_types::{ArtifactLocationV1, ComponentSourceEntryV1, ResolvedComponentV1};
2866    use proptest::prelude::*;
2867    use proptest::test_runner::{Config as ProptestConfig, RngAlgorithm, TestRng, TestRunner};
2868    use std::collections::BTreeSet;
2869    use std::path::Path;
2870    use std::str::FromStr;
2871
2872    #[derive(Clone, Debug)]
2873    enum ResolveRequest {
2874        ById(String),
2875        ByName(String),
2876    }
2877
2878    #[derive(Clone, Debug, PartialEq, Eq)]
2879    struct ResolvedComponent {
2880        key: String,
2881        source: String,
2882        artifact: String,
2883        digest: Option<String>,
2884        expected_wasm_sha256: Option<String>,
2885        skip_digest_verification: bool,
2886    }
2887
2888    #[derive(Clone, Debug, PartialEq, Eq)]
2889    struct ResolveError {
2890        code: String,
2891        message: String,
2892        context_key: String,
2893    }
2894
2895    #[derive(Clone, Debug)]
2896    struct Scenario {
2897        pack_lock: Option<PackLockV1>,
2898        component_sources: Option<ComponentSourcesV1>,
2899        request: ResolveRequest,
2900        expected_sha256: Option<String>,
2901        bytes: Vec<u8>,
2902    }
2903
2904    fn resolve_component_test(
2905        sources: Option<&ComponentSourcesV1>,
2906        lock: Option<&PackLockV1>,
2907        request: &ResolveRequest,
2908    ) -> Result<ResolvedComponent, ResolveError> {
2909        let table = if let Some(lock) = lock {
2910            component_sources_table_from_pack_lock(lock, false).map_err(|err| ResolveError {
2911                code: classify_pack_lock_error(err.to_string().as_str()).to_string(),
2912                message: err.to_string(),
2913                context_key: request_key(request).to_string(),
2914            })?
2915        } else {
2916            let sources = component_sources_table(sources).map_err(|err| ResolveError {
2917                code: "component_sources_error".to_string(),
2918                message: err.to_string(),
2919                context_key: request_key(request).to_string(),
2920            })?;
2921            sources.ok_or_else(|| ResolveError {
2922                code: "missing_component_sources".to_string(),
2923                message: "component sources not provided".to_string(),
2924                context_key: request_key(request).to_string(),
2925            })?
2926        };
2927
2928        let key = request_key(request);
2929        let source = table.get(key).ok_or_else(|| ResolveError {
2930            code: "component_not_found".to_string(),
2931            message: format!("component {key} not found"),
2932            context_key: key.to_string(),
2933        })?;
2934
2935        Ok(ResolvedComponent {
2936            key: key.to_string(),
2937            source: source.source.to_string(),
2938            artifact: match source.artifact {
2939                ComponentArtifactLocation::Inline { .. } => "inline".to_string(),
2940                ComponentArtifactLocation::Remote => "remote".to_string(),
2941            },
2942            digest: source.digest.clone(),
2943            expected_wasm_sha256: source.expected_wasm_sha256.clone(),
2944            skip_digest_verification: source.skip_digest_verification,
2945        })
2946    }
2947
2948    fn request_key(request: &ResolveRequest) -> &str {
2949        match request {
2950            ResolveRequest::ById(value) => value.as_str(),
2951            ResolveRequest::ByName(value) => value.as_str(),
2952        }
2953    }
2954
2955    fn classify_pack_lock_error(message: &str) -> &'static str {
2956        if message.contains("duplicate component name") {
2957            "duplicate_name"
2958        } else if message.contains("duplicate component id") {
2959            "duplicate_id"
2960        } else if message.contains("conflicting refs") {
2961            "conflicting_ref"
2962        } else if message.contains("conflicting bundled paths") {
2963            "conflicting_bundled_path"
2964        } else if message.contains("conflicting wasm_sha256") {
2965            "conflicting_wasm_sha256"
2966        } else if message.contains("missing source_ref") {
2967            "missing_source_ref"
2968        } else if message.contains("marked bundled but bundled_path is missing") {
2969            "missing_bundled_path"
2970        } else if message.contains("missing wasm_sha256") {
2971            "missing_wasm_sha256"
2972        } else if message.contains("tag ref") && message.contains("not bundled") {
2973            "tag_ref_requires_bundle"
2974        } else if message.contains("missing resolved_digest") {
2975            "missing_resolved_digest"
2976        } else if message.contains("invalid component ref") {
2977            "invalid_component_ref"
2978        } else if message.contains("sha256 digest") {
2979            "invalid_sha256"
2980        } else {
2981            "unknown_error"
2982        }
2983    }
2984
2985    fn known_error_codes() -> BTreeSet<&'static str> {
2986        [
2987            "component_sources_error",
2988            "missing_component_sources",
2989            "component_not_found",
2990            "duplicate_name",
2991            "duplicate_id",
2992            "conflicting_ref",
2993            "conflicting_bundled_path",
2994            "conflicting_wasm_sha256",
2995            "missing_source_ref",
2996            "missing_bundled_path",
2997            "missing_wasm_sha256",
2998            "tag_ref_requires_bundle",
2999            "missing_resolved_digest",
3000            "invalid_component_ref",
3001            "invalid_sha256",
3002            "unknown_error",
3003        ]
3004        .into_iter()
3005        .collect()
3006    }
3007
3008    fn proptest_config() -> ProptestConfig {
3009        let cases = std::env::var("PROPTEST_CASES")
3010            .ok()
3011            .and_then(|value| value.parse::<u32>().ok())
3012            .unwrap_or(128);
3013        ProptestConfig {
3014            cases,
3015            failure_persistence: None,
3016            ..ProptestConfig::default()
3017        }
3018    }
3019
3020    fn proptest_seed() -> Option<[u8; 32]> {
3021        let seed = std::env::var("PROPTEST_SEED")
3022            .ok()
3023            .and_then(|value| value.parse::<u64>().ok())?;
3024        let mut bytes = [0u8; 32];
3025        bytes[..8].copy_from_slice(&seed.to_le_bytes());
3026        Some(bytes)
3027    }
3028
3029    fn run_cases(strategy: impl Strategy<Value = Scenario>, cases: u32, seed: Option<[u8; 32]>) {
3030        let config = ProptestConfig {
3031            cases,
3032            failure_persistence: None,
3033            ..ProptestConfig::default()
3034        };
3035        let mut runner = match seed {
3036            Some(bytes) => {
3037                TestRunner::new_with_rng(config, TestRng::from_seed(RngAlgorithm::ChaCha, &bytes))
3038            }
3039            None => TestRunner::new(config),
3040        };
3041        runner
3042            .run(&strategy, |scenario| {
3043                run_scenario(&scenario);
3044                Ok(())
3045            })
3046            .unwrap();
3047    }
3048
3049    fn run_scenario(scenario: &Scenario) {
3050        let known_codes = known_error_codes();
3051        let first = resolve_component_test(
3052            scenario.component_sources.as_ref(),
3053            scenario.pack_lock.as_ref(),
3054            &scenario.request,
3055        );
3056        let second = resolve_component_test(
3057            scenario.component_sources.as_ref(),
3058            scenario.pack_lock.as_ref(),
3059            &scenario.request,
3060        );
3061        assert_eq!(normalize_result(&first), normalize_result(&second));
3062
3063        if let Some(lock) = scenario.pack_lock.as_ref() {
3064            let lock_only = resolve_component_test(None, Some(lock), &scenario.request);
3065            assert_eq!(normalize_result(&first), normalize_result(&lock_only));
3066        }
3067
3068        if let Err(err) = first.as_ref() {
3069            assert!(
3070                known_codes.contains(err.code.as_str()),
3071                "unexpected error code {}: {}",
3072                err.code,
3073                err.message
3074            );
3075        }
3076
3077        if let Some(expected) = scenario.expected_sha256.as_deref() {
3078            let expected_ok =
3079                verify_wasm_sha256("test.component", expected, &scenario.bytes).is_ok();
3080            let actual = compute_sha256_digest_for(&scenario.bytes);
3081            if actual == normalize_sha256(expected).unwrap_or_default() {
3082                assert!(expected_ok, "expected sha256 match to succeed");
3083            } else {
3084                assert!(!expected_ok, "expected sha256 mismatch to fail");
3085            }
3086        }
3087    }
3088
3089    fn normalize_result(
3090        result: &Result<ResolvedComponent, ResolveError>,
3091    ) -> Result<ResolvedComponent, ResolveError> {
3092        match result {
3093            Ok(value) => Ok(value.clone()),
3094            Err(err) => Err(err.clone()),
3095        }
3096    }
3097
3098    fn scenario_strategy() -> impl Strategy<Value = Scenario> {
3099        let name = any::<u8>().prop_map(|n| format!("component{n}.core"));
3100        let alt_name = any::<u8>().prop_map(|n| format!("component_alt{n}.core"));
3101        let tag_ref = any::<bool>();
3102        let bundled = any::<bool>();
3103        let include_sha = any::<bool>();
3104        let include_component_id = any::<bool>();
3105        let request_by_id = any::<bool>();
3106        let use_lock = any::<bool>();
3107        let use_sources = any::<bool>();
3108        let bytes = prop::collection::vec(any::<u8>(), 1..64);
3109
3110        (
3111            name,
3112            alt_name,
3113            tag_ref,
3114            bundled,
3115            include_sha,
3116            include_component_id,
3117            request_by_id,
3118            use_lock,
3119            use_sources,
3120            bytes,
3121        )
3122            .prop_map(
3123                |(
3124                    name,
3125                    alt_name,
3126                    tag_ref,
3127                    bundled,
3128                    include_sha,
3129                    include_component_id,
3130                    request_by_id,
3131                    use_lock,
3132                    use_sources,
3133                    bytes,
3134                )| {
3135                    let component_id_str = if include_component_id {
3136                        alt_name.clone()
3137                    } else {
3138                        name.clone()
3139                    };
3140                    let component_id = ComponentId::from_str(&component_id_str).ok();
3141                    let source_ref = if tag_ref {
3142                        format!("oci://registry.test/{name}:v1")
3143                    } else {
3144                        format!(
3145                            "oci://registry.test/{name}@sha256:{}",
3146                            hex::encode([0x11u8; 32])
3147                        )
3148                    };
3149                    let expected_sha256 = if bundled && include_sha {
3150                        Some(compute_sha256_digest_for(&bytes))
3151                    } else {
3152                        None
3153                    };
3154
3155                    let lock_component = PackLockComponent {
3156                        name: name.clone(),
3157                        source_ref: Some(source_ref),
3158                        legacy_ref: None,
3159                        component_id,
3160                        bundled: Some(bundled),
3161                        bundled_path: if bundled {
3162                            Some(format!("components/{name}.wasm"))
3163                        } else {
3164                            None
3165                        },
3166                        legacy_path: None,
3167                        wasm_sha256: expected_sha256.clone(),
3168                        legacy_sha256: None,
3169                        resolved_digest: if bundled {
3170                            None
3171                        } else {
3172                            Some("sha256:deadbeef".to_string())
3173                        },
3174                        digest: None,
3175                    };
3176
3177                    let pack_lock = if use_lock {
3178                        Some(PackLockV1 {
3179                            schema_version: 1,
3180                            components: vec![lock_component],
3181                        })
3182                    } else {
3183                        None
3184                    };
3185
3186                    let component_sources = if use_sources {
3187                        Some(ComponentSourcesV1::new(vec![ComponentSourceEntryV1 {
3188                            name: name.clone(),
3189                            component_id: ComponentId::from_str(&name).ok(),
3190                            source: ComponentSourceRef::from_str(
3191                                "oci://registry.test/component@sha256:deadbeef",
3192                            )
3193                            .expect("component ref"),
3194                            resolved: ResolvedComponentV1 {
3195                                digest: "sha256:deadbeef".to_string(),
3196                                signature: None,
3197                                signed_by: None,
3198                            },
3199                            artifact: if bundled {
3200                                ArtifactLocationV1::Inline {
3201                                    wasm_path: format!("components/{name}.wasm"),
3202                                    manifest_path: None,
3203                                }
3204                            } else {
3205                                ArtifactLocationV1::Remote
3206                            },
3207                            licensing_hint: None,
3208                            metering_hint: None,
3209                        }]))
3210                    } else {
3211                        None
3212                    };
3213
3214                    let request = if request_by_id {
3215                        ResolveRequest::ById(component_id_str.clone())
3216                    } else {
3217                        ResolveRequest::ByName(name.clone())
3218                    };
3219
3220                    Scenario {
3221                        pack_lock,
3222                        component_sources,
3223                        request,
3224                        expected_sha256,
3225                        bytes,
3226                    }
3227                },
3228            )
3229    }
3230
3231    #[test]
3232    fn pack_resolution_proptest() {
3233        let seed = proptest_seed();
3234        run_cases(scenario_strategy(), proptest_config().cases, seed);
3235    }
3236
3237    #[test]
3238    fn pack_resolution_regression_seeds() {
3239        let seeds_path =
3240            Path::new(env!("CARGO_MANIFEST_DIR")).join("../../tests/fixtures/proptest-seeds.txt");
3241        let raw = std::fs::read_to_string(&seeds_path).expect("read proptest seeds");
3242        for line in raw.lines() {
3243            let line = line.trim();
3244            if line.is_empty() || line.starts_with('#') {
3245                continue;
3246            }
3247            let seed = line.parse::<u64>().expect("seed must be an integer");
3248            let mut bytes = [0u8; 32];
3249            bytes[..8].copy_from_slice(&seed.to_le_bytes());
3250            run_cases(scenario_strategy(), 1, Some(bytes));
3251        }
3252    }
3253}
3254
3255fn locate_pack_assets(
3256    materialized_root: Option<&Path>,
3257    archive_hint: Option<&Path>,
3258) -> Result<(Option<PathBuf>, Option<TempDir>)> {
3259    if let Some(root) = materialized_root {
3260        let assets = root.join("assets");
3261        if assets.is_dir() {
3262            return Ok((Some(assets), None));
3263        }
3264    }
3265    if let Some(path) = archive_hint
3266        && let Some((tempdir, assets)) = extract_assets_from_archive(path)?
3267    {
3268        return Ok((Some(assets), Some(tempdir)));
3269    }
3270    Ok((None, None))
3271}
3272
3273fn extract_assets_from_archive(path: &Path) -> Result<Option<(TempDir, PathBuf)>> {
3274    let file =
3275        File::open(path).with_context(|| format!("failed to open pack {}", path.display()))?;
3276    let mut archive =
3277        ZipArchive::new(file).with_context(|| format!("failed to read pack {}", path.display()))?;
3278    let temp = TempDir::new().context("failed to create temporary assets directory")?;
3279    let mut found = false;
3280    for idx in 0..archive.len() {
3281        let mut entry = archive.by_index(idx)?;
3282        let name = entry.name();
3283        if !name.starts_with("assets/") {
3284            continue;
3285        }
3286        let dest = temp.path().join(name);
3287        if name.ends_with('/') {
3288            std::fs::create_dir_all(&dest)?;
3289            found = true;
3290            continue;
3291        }
3292        if let Some(parent) = dest.parent() {
3293            std::fs::create_dir_all(parent)?;
3294        }
3295        let mut outfile = std::fs::File::create(&dest)?;
3296        std::io::copy(&mut entry, &mut outfile)?;
3297        found = true;
3298    }
3299    if found {
3300        let assets_path = temp.path().join("assets");
3301        Ok(Some((temp, assets_path)))
3302    } else {
3303        Ok(None)
3304    }
3305}
3306
3307fn dist_options_from(component_resolution: &ComponentResolution) -> DistOptions {
3308    let mut opts = DistOptions {
3309        allow_tags: true,
3310        ..DistOptions::default()
3311    };
3312    if let Some(cache_dir) = component_resolution.dist_cache_dir.clone() {
3313        opts.cache_dir = cache_dir;
3314    }
3315    if component_resolution.dist_offline {
3316        opts.offline = true;
3317    }
3318    opts
3319}
3320
3321#[allow(clippy::too_many_arguments)]
3322async fn load_components_from_sources(
3323    cache: &CacheManager,
3324    engine: &Engine,
3325    component_sources: &HashMap<String, ComponentSourceInfo>,
3326    component_resolution: &ComponentResolution,
3327    specs: &[ComponentSpec],
3328    missing: &mut HashSet<String>,
3329    into: &mut HashMap<String, PackComponent>,
3330    materialized_root: Option<&Path>,
3331    archive_hint: Option<&Path>,
3332) -> Result<()> {
3333    let mut archive = if let Some(path) = archive_hint {
3334        Some(
3335            ZipArchive::new(File::open(path)?)
3336                .with_context(|| format!("{} is not a valid gtpack", path.display()))?,
3337        )
3338    } else {
3339        None
3340    };
3341    let mut dist_client: Option<DistClient> = None;
3342
3343    for spec in specs {
3344        if !missing.contains(&spec.id) {
3345            continue;
3346        }
3347        let Some(source) = component_sources.get(&spec.id) else {
3348            continue;
3349        };
3350
3351        let bytes = match &source.artifact {
3352            ComponentArtifactLocation::Inline { wasm_path } => {
3353                if let Some(root) = materialized_root {
3354                    let path = root.join(wasm_path);
3355                    if path.exists() {
3356                        std::fs::read(&path).with_context(|| {
3357                            format!(
3358                                "failed to read inline component {} from {}",
3359                                spec.id,
3360                                path.display()
3361                            )
3362                        })?
3363                    } else if archive.is_none() {
3364                        bail!("inline component {} missing at {}", spec.id, path.display());
3365                    } else {
3366                        read_entry(
3367                            archive.as_mut().expect("archive present when needed"),
3368                            wasm_path,
3369                        )
3370                        .with_context(|| {
3371                            format!(
3372                                "inline component {} missing at {} in pack archive",
3373                                spec.id, wasm_path
3374                            )
3375                        })?
3376                    }
3377                } else if let Some(archive) = archive.as_mut() {
3378                    read_entry(archive, wasm_path).with_context(|| {
3379                        format!(
3380                            "inline component {} missing at {} in pack archive",
3381                            spec.id, wasm_path
3382                        )
3383                    })?
3384                } else {
3385                    bail!(
3386                        "inline component {} missing and no pack source available",
3387                        spec.id
3388                    );
3389                }
3390            }
3391            ComponentArtifactLocation::Remote => {
3392                if source.source.is_tag() {
3393                    bail!(
3394                        "component {} uses tag ref {} but is not bundled; rebuild the pack",
3395                        spec.id,
3396                        source.source
3397                    );
3398                }
3399                let client = dist_client.get_or_insert_with(|| {
3400                    DistClient::new(dist_options_from(component_resolution))
3401                });
3402                let reference = source.source.to_string();
3403                fault::maybe_fail_asset(&reference)
3404                    .await
3405                    .with_context(|| format!("fault injection blocked asset {reference}"))?;
3406                let digest = source.digest.as_deref().ok_or_else(|| {
3407                    anyhow!(
3408                        "component {} missing expected digest for remote component",
3409                        spec.id
3410                    )
3411                })?;
3412                let cache_path = if component_resolution.dist_offline {
3413                    client
3414                        .fetch_digest(digest)
3415                        .await
3416                        .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?
3417                } else {
3418                    let resolved = client
3419                        .resolve_ref(&reference)
3420                        .await
3421                        .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
3422                    let expected = normalize_digest(digest);
3423                    let actual = normalize_digest(&resolved.digest);
3424                    if expected != actual {
3425                        bail!(
3426                            "component {} digest mismatch after fetch: expected {}, got {}",
3427                            spec.id,
3428                            expected,
3429                            actual
3430                        );
3431                    }
3432                    resolved.cache_path.ok_or_else(|| {
3433                        anyhow!(
3434                            "component {} resolved from {} but cache path is missing",
3435                            spec.id,
3436                            reference
3437                        )
3438                    })?
3439                };
3440                std::fs::read(&cache_path).with_context(|| {
3441                    format!(
3442                        "failed to read cached component {} from {}",
3443                        spec.id,
3444                        cache_path.display()
3445                    )
3446                })?
3447            }
3448        };
3449
3450        if let Some(expected) = source.expected_wasm_sha256.as_deref() {
3451            verify_wasm_sha256(&spec.id, expected, &bytes)?;
3452        } else if source.skip_digest_verification {
3453            let actual = compute_sha256_digest_for(&bytes);
3454            warn!(
3455                component_id = %spec.id,
3456                digest = %actual,
3457                "bundled component missing wasm_sha256; allowing due to flag"
3458            );
3459        } else {
3460            let expected = source.digest.as_deref().ok_or_else(|| {
3461                anyhow!(
3462                    "component {} missing expected digest for verification",
3463                    spec.id
3464                )
3465            })?;
3466            verify_component_digest(&spec.id, expected, &bytes)?;
3467        }
3468        let component =
3469            compile_component_with_cache(cache, engine, source.digest.as_deref(), bytes)
3470                .await
3471                .with_context(|| format!("failed to compile component {}", spec.id))?;
3472        into.insert(
3473            spec.id.clone(),
3474            PackComponent {
3475                name: spec.id.clone(),
3476                version: spec.version.clone(),
3477                component,
3478            },
3479        );
3480        missing.remove(&spec.id);
3481    }
3482
3483    Ok(())
3484}
3485
3486fn dist_error_for_component(err: DistError, component_id: &str, reference: &str) -> anyhow::Error {
3487    match err {
3488        DistError::CacheMiss { reference: missing } => anyhow!(
3489            "remote component {} is not cached for {}. Run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3490            component_id,
3491            missing,
3492            reference
3493        ),
3494        DistError::Offline { reference: blocked } => anyhow!(
3495            "offline mode blocked fetching component {} from {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3496            component_id,
3497            blocked,
3498            reference
3499        ),
3500        DistError::AuthRequired { target } => anyhow!(
3501            "component {} requires authenticated source {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3502            component_id,
3503            target,
3504            reference
3505        ),
3506        other => anyhow!(
3507            "failed to resolve component {} from {}: {}",
3508            component_id,
3509            reference,
3510            other
3511        ),
3512    }
3513}
3514
3515async fn load_components_from_overrides(
3516    cache: &CacheManager,
3517    engine: &Engine,
3518    overrides: &HashMap<String, PathBuf>,
3519    specs: &[ComponentSpec],
3520    missing: &mut HashSet<String>,
3521    into: &mut HashMap<String, PackComponent>,
3522) -> Result<()> {
3523    for spec in specs {
3524        if !missing.contains(&spec.id) {
3525            continue;
3526        }
3527        let Some(path) = overrides.get(&spec.id) else {
3528            continue;
3529        };
3530        let bytes = std::fs::read(path)
3531            .with_context(|| format!("failed to read override component {}", path.display()))?;
3532        let component = compile_component_with_cache(cache, engine, None, bytes)
3533            .await
3534            .with_context(|| {
3535                format!(
3536                    "failed to compile component {} from override {}",
3537                    spec.id,
3538                    path.display()
3539                )
3540            })?;
3541        into.insert(
3542            spec.id.clone(),
3543            PackComponent {
3544                name: spec.id.clone(),
3545                version: spec.version.clone(),
3546                component,
3547            },
3548        );
3549        missing.remove(&spec.id);
3550    }
3551    Ok(())
3552}
3553
3554async fn load_components_from_dir(
3555    cache: &CacheManager,
3556    engine: &Engine,
3557    root: &Path,
3558    specs: &[ComponentSpec],
3559    missing: &mut HashSet<String>,
3560    into: &mut HashMap<String, PackComponent>,
3561) -> Result<()> {
3562    for spec in specs {
3563        if !missing.contains(&spec.id) {
3564            continue;
3565        }
3566        let path = component_path_for_spec(root, spec);
3567        if !path.exists() {
3568            tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
3569            continue;
3570        }
3571        let bytes = std::fs::read(&path)
3572            .with_context(|| format!("failed to read component {}", path.display()))?;
3573        let component = compile_component_with_cache(cache, engine, None, bytes)
3574            .await
3575            .with_context(|| {
3576                format!(
3577                    "failed to compile component {} from {}",
3578                    spec.id,
3579                    path.display()
3580                )
3581            })?;
3582        into.insert(
3583            spec.id.clone(),
3584            PackComponent {
3585                name: spec.id.clone(),
3586                version: spec.version.clone(),
3587                component,
3588            },
3589        );
3590        missing.remove(&spec.id);
3591    }
3592    Ok(())
3593}
3594
3595async fn load_components_from_archive(
3596    cache: &CacheManager,
3597    engine: &Engine,
3598    path: &Path,
3599    specs: &[ComponentSpec],
3600    missing: &mut HashSet<String>,
3601    into: &mut HashMap<String, PackComponent>,
3602) -> Result<()> {
3603    let mut archive = ZipArchive::new(File::open(path)?)
3604        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
3605    for spec in specs {
3606        if !missing.contains(&spec.id) {
3607            continue;
3608        }
3609        let file_name = spec
3610            .legacy_path
3611            .clone()
3612            .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
3613        let bytes = match read_entry(&mut archive, &file_name) {
3614            Ok(bytes) => bytes,
3615            Err(err) => {
3616                warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
3617                continue;
3618            }
3619        };
3620        let component = compile_component_with_cache(cache, engine, None, bytes)
3621            .await
3622            .with_context(|| format!("failed to compile component {}", spec.id))?;
3623        into.insert(
3624            spec.id.clone(),
3625            PackComponent {
3626                name: spec.id.clone(),
3627                version: spec.version.clone(),
3628                component,
3629            },
3630        );
3631        missing.remove(&spec.id);
3632    }
3633    Ok(())
3634}
3635
3636#[cfg(test)]
3637mod tests {
3638    use super::*;
3639    use greentic_flow::model::{FlowDoc, NodeDoc};
3640    use indexmap::IndexMap;
3641    use serde_json::json;
3642
3643    #[test]
3644    fn normalizes_raw_component_to_component_exec() {
3645        let mut nodes = IndexMap::new();
3646        let mut raw = IndexMap::new();
3647        raw.insert(
3648            "templating.handlebars".into(),
3649            json!({ "template": "Hi {{name}}" }),
3650        );
3651        nodes.insert(
3652            "start".into(),
3653            NodeDoc {
3654                raw,
3655                routing: json!([{"out": true}]),
3656                ..Default::default()
3657            },
3658        );
3659        let doc = FlowDoc {
3660            id: "welcome".into(),
3661            title: None,
3662            description: None,
3663            flow_type: "messaging".into(),
3664            start: Some("start".into()),
3665            parameters: json!({}),
3666            tags: Vec::new(),
3667            schema_version: None,
3668            entrypoints: IndexMap::new(),
3669            nodes,
3670        };
3671
3672        let normalized = normalize_flow_doc(doc);
3673        let node = normalized.nodes.get("start").expect("node exists");
3674        assert_eq!(node.operation.as_deref(), Some("component.exec"));
3675        assert!(node.raw.is_empty());
3676        let payload = node.payload.as_object().expect("payload object");
3677        assert_eq!(
3678            payload.get("component"),
3679            Some(&Value::String("templating.handlebars".into()))
3680        );
3681        assert_eq!(
3682            payload.get("operation"),
3683            Some(&Value::String("render".into()))
3684        );
3685        let input = payload.get("input").unwrap();
3686        assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
3687    }
3688}
3689
3690#[derive(Clone, Debug, Default, Serialize, Deserialize)]
3691pub struct PackMetadata {
3692    pub pack_id: String,
3693    pub version: String,
3694    #[serde(default)]
3695    pub entry_flows: Vec<String>,
3696    #[serde(default)]
3697    pub secret_requirements: Vec<greentic_types::SecretRequirement>,
3698}
3699
3700impl PackMetadata {
3701    fn from_wasm(bytes: &[u8]) -> Option<Self> {
3702        let parser = Parser::new(0);
3703        for payload in parser.parse_all(bytes) {
3704            let payload = payload.ok()?;
3705            match payload {
3706                Payload::CustomSection(section) => {
3707                    if section.name() == "greentic.manifest"
3708                        && let Ok(meta) = Self::from_bytes(section.data())
3709                    {
3710                        return Some(meta);
3711                    }
3712                }
3713                Payload::DataSection(reader) => {
3714                    for segment in reader.into_iter().flatten() {
3715                        if let Ok(meta) = Self::from_bytes(segment.data) {
3716                            return Some(meta);
3717                        }
3718                    }
3719                }
3720                _ => {}
3721            }
3722        }
3723        None
3724    }
3725
3726    fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
3727        #[derive(Deserialize)]
3728        struct RawManifest {
3729            pack_id: String,
3730            version: String,
3731            #[serde(default)]
3732            entry_flows: Vec<String>,
3733            #[serde(default)]
3734            flows: Vec<RawFlow>,
3735            #[serde(default)]
3736            secret_requirements: Vec<greentic_types::SecretRequirement>,
3737        }
3738
3739        #[derive(Deserialize)]
3740        struct RawFlow {
3741            id: String,
3742        }
3743
3744        let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
3745        let mut entry_flows = if manifest.entry_flows.is_empty() {
3746            manifest.flows.iter().map(|f| f.id.clone()).collect()
3747        } else {
3748            manifest.entry_flows.clone()
3749        };
3750        entry_flows.retain(|id| !id.is_empty());
3751        Ok(Self {
3752            pack_id: manifest.pack_id,
3753            version: manifest.version,
3754            entry_flows,
3755            secret_requirements: manifest.secret_requirements,
3756        })
3757    }
3758
3759    pub fn fallback(path: &Path) -> Self {
3760        let pack_id = path
3761            .file_stem()
3762            .map(|s| s.to_string_lossy().into_owned())
3763            .unwrap_or_else(|| "unknown-pack".to_string());
3764        Self {
3765            pack_id,
3766            version: "0.0.0".to_string(),
3767            entry_flows: Vec::new(),
3768            secret_requirements: Vec::new(),
3769        }
3770    }
3771
3772    pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
3773        let entry_flows = manifest
3774            .flows
3775            .iter()
3776            .map(|flow| flow.id.as_str().to_string())
3777            .collect::<Vec<_>>();
3778        Self {
3779            pack_id: manifest.pack_id.as_str().to_string(),
3780            version: manifest.version.to_string(),
3781            entry_flows,
3782            secret_requirements: manifest.secret_requirements.clone(),
3783        }
3784    }
3785}