greentic_runner_host/
pack.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::cache::{ArtifactKey, CacheConfig, CacheManager, CpuPolicy, EngineProfile};
10use crate::component_api::{
11    self, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::provider::{ProviderBinding, ProviderRegistry};
15use crate::provider_core::SchemaCorePre as ProviderComponentPre;
16use crate::provider_core_only;
17use crate::runtime_wasmtime::{Component, Engine, InstancePre, Linker, ResourceTable};
18use anyhow::{Context, Result, anyhow, bail};
19use greentic_distributor_client::dist::{DistClient, DistError, DistOptions};
20use greentic_interfaces_wasmtime::host_helpers::v1::{
21    self as host_v1, HostFns, add_all_v1_to_linker,
22    runner_host_http::RunnerHostHttp,
23    runner_host_kv::RunnerHostKv,
24    secrets_store::{SecretsError, SecretsStoreHost},
25    state_store::{
26        OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
27        StateStoreHost, TenantCtx as StateTenantCtx,
28    },
29    telemetry_logger::{
30        OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
31        TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
32        TenantCtx as TelemetryTenantCtx,
33    },
34};
35use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
36use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
37use greentic_pack::builder as legacy_pack;
38use greentic_types::flow::FlowHasher;
39use greentic_types::{
40    ArtifactLocationV1, ComponentId, ComponentManifest, ComponentSourceRef, ComponentSourcesV1,
41    EXT_COMPONENT_SOURCES_V1, EnvId, ExtensionRef, Flow, FlowComponentRef, FlowId, FlowKind,
42    FlowMetadata, InputMapping, Node, NodeId, OutputMapping, Routing, StateKey as StoreStateKey,
43    TeamId, TelemetryHints, TenantCtx as TypesTenantCtx, TenantId, UserId, decode_pack_manifest,
44    pack_manifest::ExtensionInline,
45};
46use host_v1::http_client::{
47    HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
48    Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
49    RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
50    TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
51};
52use indexmap::IndexMap;
53use once_cell::sync::Lazy;
54use parking_lot::{Mutex, RwLock};
55use reqwest::blocking::Client as BlockingClient;
56use runner_core::normalize_under_root;
57use serde::{Deserialize, Serialize};
58use serde_cbor;
59use serde_json::{self, Value};
60use sha2::Digest;
61use tokio::fs;
62use wasmparser::{Parser, Payload};
63use wasmtime::StoreContextMut;
64use zip::ZipArchive;
65
66use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
67use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
68use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
69#[cfg(feature = "fault-injection")]
70use crate::testing::fault_injection::{FaultContext, FaultPoint, maybe_fail};
71
72use crate::config::HostConfig;
73use crate::fault;
74use crate::secrets::{DynSecretsManager, read_secret_blocking};
75use crate::storage::state::STATE_PREFIX;
76use crate::storage::{DynSessionStore, DynStateStore};
77use crate::verify;
78use crate::wasi::RunnerWasiPolicy;
79use tracing::warn;
80use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
81use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
82
83use greentic_flow::model::FlowDoc;
84
85#[allow(dead_code)]
86pub struct PackRuntime {
87    /// Component artifact path (wasm file).
88    path: PathBuf,
89    /// Optional archive (.gtpack) used to load flows/manifests.
90    archive_path: Option<PathBuf>,
91    config: Arc<HostConfig>,
92    engine: Engine,
93    metadata: PackMetadata,
94    manifest: Option<greentic_types::PackManifest>,
95    legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
96    component_manifests: HashMap<String, ComponentManifest>,
97    mocks: Option<Arc<MockLayer>>,
98    flows: Option<PackFlows>,
99    components: HashMap<String, PackComponent>,
100    http_client: Arc<BlockingClient>,
101    pre_cache: Mutex<HashMap<String, InstancePre<ComponentState>>>,
102    session_store: Option<DynSessionStore>,
103    state_store: Option<DynStateStore>,
104    wasi_policy: Arc<RunnerWasiPolicy>,
105    provider_registry: RwLock<Option<ProviderRegistry>>,
106    secrets: DynSecretsManager,
107    oauth_config: Option<OAuthBrokerConfig>,
108    cache: CacheManager,
109}
110
111struct PackComponent {
112    #[allow(dead_code)]
113    name: String,
114    #[allow(dead_code)]
115    version: String,
116    component: Arc<Component>,
117}
118
119#[derive(Debug, Default, Clone)]
120pub struct ComponentResolution {
121    /// Root of a materialized pack directory containing `manifest.cbor` and `components/`.
122    pub materialized_root: Option<PathBuf>,
123    /// Explicit overrides mapping component id -> wasm path.
124    pub overrides: HashMap<String, PathBuf>,
125    /// If true, do not fetch remote components; require cached artifacts.
126    pub dist_offline: bool,
127    /// Optional cache directory for resolved remote components.
128    pub dist_cache_dir: Option<PathBuf>,
129    /// Allow bundled components without wasm_sha256 (dev-only escape hatch).
130    pub allow_missing_hash: bool,
131}
132
133fn build_blocking_client() -> BlockingClient {
134    std::thread::spawn(|| {
135        BlockingClient::builder()
136            .no_proxy()
137            .build()
138            .expect("blocking client")
139    })
140    .join()
141    .expect("client build thread panicked")
142}
143
144fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
145    let (root, candidate) = if path.is_absolute() {
146        let parent = path
147            .parent()
148            .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
149        let root = parent
150            .canonicalize()
151            .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
152        let file = path
153            .file_name()
154            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
155        (root, PathBuf::from(file))
156    } else {
157        let cwd = std::env::current_dir().context("failed to resolve current directory")?;
158        let base = if let Some(parent) = path.parent() {
159            cwd.join(parent)
160        } else {
161            cwd
162        };
163        let root = base
164            .canonicalize()
165            .with_context(|| format!("failed to canonicalize {}", base.display()))?;
166        let file = path
167            .file_name()
168            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
169        (root, PathBuf::from(file))
170    };
171    let safe = normalize_under_root(&root, &candidate)?;
172    Ok((root, safe))
173}
174
175static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
176
177#[derive(Debug, Clone, Serialize, Deserialize)]
178pub struct FlowDescriptor {
179    pub id: String,
180    #[serde(rename = "type")]
181    pub flow_type: String,
182    pub pack_id: String,
183    pub profile: String,
184    pub version: String,
185    #[serde(default)]
186    pub description: Option<String>,
187}
188
189pub struct HostState {
190    #[allow(dead_code)]
191    pack_id: String,
192    config: Arc<HostConfig>,
193    http_client: Arc<BlockingClient>,
194    default_env: String,
195    #[allow(dead_code)]
196    session_store: Option<DynSessionStore>,
197    state_store: Option<DynStateStore>,
198    mocks: Option<Arc<MockLayer>>,
199    secrets: DynSecretsManager,
200    oauth_config: Option<OAuthBrokerConfig>,
201    oauth_host: OAuthBrokerHost,
202    exec_ctx: Option<ComponentExecCtx>,
203}
204
205impl HostState {
206    #[allow(clippy::default_constructed_unit_structs)]
207    #[allow(clippy::too_many_arguments)]
208    pub fn new(
209        pack_id: String,
210        config: Arc<HostConfig>,
211        http_client: Arc<BlockingClient>,
212        mocks: Option<Arc<MockLayer>>,
213        session_store: Option<DynSessionStore>,
214        state_store: Option<DynStateStore>,
215        secrets: DynSecretsManager,
216        oauth_config: Option<OAuthBrokerConfig>,
217        exec_ctx: Option<ComponentExecCtx>,
218    ) -> Result<Self> {
219        let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
220        Ok(Self {
221            pack_id,
222            config,
223            http_client,
224            default_env,
225            session_store,
226            state_store,
227            mocks,
228            secrets,
229            oauth_config,
230            oauth_host: OAuthBrokerHost::default(),
231            exec_ctx,
232        })
233    }
234
235    pub fn get_secret(&self, key: &str) -> Result<String> {
236        if provider_core_only::is_enabled() {
237            bail!(provider_core_only::blocked_message("secrets"))
238        }
239        if !self.config.secrets_policy.is_allowed(key) {
240            bail!("secret {key} is not permitted by bindings policy");
241        }
242        if let Some(mock) = &self.mocks
243            && let Some(value) = mock.secrets_lookup(key)
244        {
245            return Ok(value);
246        }
247        let ctx = self.config.tenant_ctx();
248        let bytes = read_secret_blocking(&self.secrets, &ctx, key)
249            .context("failed to read secret from manager")?;
250        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
251        Ok(value)
252    }
253
254    fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
255        let tenant_raw = ctx
256            .as_ref()
257            .map(|ctx| ctx.tenant.clone())
258            .or_else(|| self.exec_ctx.as_ref().map(|ctx| ctx.tenant.tenant.clone()))
259            .unwrap_or_else(|| self.config.tenant.clone());
260        let env_raw = ctx
261            .as_ref()
262            .map(|ctx| ctx.env.clone())
263            .unwrap_or_else(|| self.default_env.clone());
264        let tenant_id = TenantId::from_str(&tenant_raw)
265            .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
266        let env_id = EnvId::from_str(&env_raw)
267            .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
268        let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
269        if let Some(exec_ctx) = self.exec_ctx.as_ref() {
270            if let Some(team) = exec_ctx.tenant.team.as_ref() {
271                let team_id =
272                    TeamId::from_str(team).with_context(|| format!("invalid team id `{team}`"))?;
273                tenant_ctx = tenant_ctx.with_team(Some(team_id));
274            }
275            if let Some(user) = exec_ctx.tenant.user.as_ref() {
276                let user_id =
277                    UserId::from_str(user).with_context(|| format!("invalid user id `{user}`"))?;
278                tenant_ctx = tenant_ctx.with_user(Some(user_id));
279            }
280            tenant_ctx = tenant_ctx.with_flow(exec_ctx.flow_id.clone());
281            if let Some(node) = exec_ctx.node_id.as_ref() {
282                tenant_ctx = tenant_ctx.with_node(node.clone());
283            }
284            if let Some(session) = exec_ctx.tenant.correlation_id.as_ref() {
285                tenant_ctx = tenant_ctx.with_session(session.clone());
286            }
287            tenant_ctx.trace_id = exec_ctx.tenant.trace_id.clone();
288        }
289
290        if let Some(ctx) = ctx {
291            if let Some(team) = ctx.team.or(ctx.team_id) {
292                let team_id =
293                    TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
294                tenant_ctx = tenant_ctx.with_team(Some(team_id));
295            }
296            if let Some(user) = ctx.user.or(ctx.user_id) {
297                let user_id =
298                    UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
299                tenant_ctx = tenant_ctx.with_user(Some(user_id));
300            }
301            if let Some(flow) = ctx.flow_id {
302                tenant_ctx = tenant_ctx.with_flow(flow);
303            }
304            if let Some(node) = ctx.node_id {
305                tenant_ctx = tenant_ctx.with_node(node);
306            }
307            if let Some(provider) = ctx.provider_id {
308                tenant_ctx = tenant_ctx.with_provider(provider);
309            }
310            if let Some(session) = ctx.session_id {
311                tenant_ctx = tenant_ctx.with_session(session);
312            }
313            tenant_ctx.trace_id = ctx.trace_id;
314        }
315        Ok(tenant_ctx)
316    }
317
318    fn send_http_request(
319        &mut self,
320        req: HttpRequest,
321        opts: Option<HttpRequestOptionsV1_1>,
322        _ctx: Option<HttpTenantCtx>,
323    ) -> Result<HttpResponse, HttpClientError> {
324        if !self.config.http_enabled {
325            return Err(HttpClientError {
326                code: "denied".into(),
327                message: "http client disabled by policy".into(),
328            });
329        }
330
331        let mut mock_state = None;
332        let raw_body = req.body.clone();
333        if let Some(mock) = &self.mocks
334            && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
335        {
336            match mock.http_begin(&meta) {
337                HttpDecision::Mock(response) => {
338                    let headers = response
339                        .headers
340                        .iter()
341                        .map(|(k, v)| (k.clone(), v.clone()))
342                        .collect();
343                    return Ok(HttpResponse {
344                        status: response.status,
345                        headers,
346                        body: response.body.clone().map(|b| b.into_bytes()),
347                    });
348                }
349                HttpDecision::Deny(reason) => {
350                    return Err(HttpClientError {
351                        code: "denied".into(),
352                        message: reason,
353                    });
354                }
355                HttpDecision::Passthrough { record } => {
356                    mock_state = Some((meta, record));
357                }
358            }
359        }
360
361        let method = req.method.parse().unwrap_or(reqwest::Method::GET);
362        let mut builder = self.http_client.request(method, &req.url);
363        for (key, value) in req.headers {
364            if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
365                && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
366            {
367                builder = builder.header(header, header_value);
368            }
369        }
370
371        if let Some(body) = raw_body.clone() {
372            builder = builder.body(body);
373        }
374
375        if let Some(opts) = opts {
376            if let Some(timeout_ms) = opts.timeout_ms {
377                builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
378            }
379            if opts.allow_insecure == Some(true) {
380                warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
381            }
382            if let Some(follow_redirects) = opts.follow_redirects
383                && !follow_redirects
384            {
385                warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
386            }
387        }
388
389        let response = match builder.send() {
390            Ok(resp) => resp,
391            Err(err) => {
392                warn!(url = %req.url, error = %err, "http client request failed");
393                return Err(HttpClientError {
394                    code: "unavailable".into(),
395                    message: err.to_string(),
396                });
397            }
398        };
399
400        let status = response.status().as_u16();
401        let headers_vec = response
402            .headers()
403            .iter()
404            .map(|(k, v)| {
405                (
406                    k.as_str().to_string(),
407                    v.to_str().unwrap_or_default().to_string(),
408                )
409            })
410            .collect::<Vec<_>>();
411        let body_bytes = response.bytes().ok().map(|b| b.to_vec());
412
413        if let Some((meta, true)) = mock_state.take()
414            && let Some(mock) = &self.mocks
415        {
416            let recorded = HttpMockResponse::new(
417                status,
418                headers_vec.clone().into_iter().collect(),
419                body_bytes
420                    .as_ref()
421                    .map(|b| String::from_utf8_lossy(b).into_owned()),
422            );
423            mock.http_record(&meta, &recorded);
424        }
425
426        Ok(HttpResponse {
427            status,
428            headers: headers_vec,
429            body: body_bytes,
430        })
431    }
432}
433
434impl SecretsStoreHost for HostState {
435    fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
436        if provider_core_only::is_enabled() {
437            warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
438            return Err(SecretsError::Denied);
439        }
440        if !self.config.secrets_policy.is_allowed(&key) {
441            return Err(SecretsError::Denied);
442        }
443        if let Some(mock) = &self.mocks
444            && let Some(value) = mock.secrets_lookup(&key)
445        {
446            return Ok(Some(value.into_bytes()));
447        }
448        let ctx = self.config.tenant_ctx();
449        match read_secret_blocking(&self.secrets, &ctx, &key) {
450            Ok(bytes) => Ok(Some(bytes)),
451            Err(err) => {
452                warn!(secret = %key, error = %err, "secret lookup failed");
453                Err(SecretsError::NotFound)
454            }
455        }
456    }
457}
458
459impl HttpClientHost for HostState {
460    fn send(
461        &mut self,
462        req: HttpRequest,
463        ctx: Option<HttpTenantCtx>,
464    ) -> Result<HttpResponse, HttpClientError> {
465        self.send_http_request(req, None, ctx)
466    }
467}
468
469impl HttpClientHostV1_1 for HostState {
470    fn send(
471        &mut self,
472        req: HttpRequestV1_1,
473        opts: Option<HttpRequestOptionsV1_1>,
474        ctx: Option<HttpTenantCtxV1_1>,
475    ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
476        let legacy_req = HttpRequest {
477            method: req.method,
478            url: req.url,
479            headers: req.headers,
480            body: req.body,
481        };
482        let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
483            env: ctx.env,
484            tenant: ctx.tenant,
485            tenant_id: ctx.tenant_id,
486            team: ctx.team,
487            team_id: ctx.team_id,
488            user: ctx.user,
489            user_id: ctx.user_id,
490            trace_id: ctx.trace_id,
491            correlation_id: ctx.correlation_id,
492            attributes: ctx.attributes,
493            session_id: ctx.session_id,
494            flow_id: ctx.flow_id,
495            node_id: ctx.node_id,
496            provider_id: ctx.provider_id,
497            deadline_ms: ctx.deadline_ms,
498            attempt: ctx.attempt,
499            idempotency_key: ctx.idempotency_key,
500            impersonation: ctx
501                .impersonation
502                .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
503                    actor_id,
504                    reason,
505                }),
506        });
507
508        self.send_http_request(legacy_req, opts, legacy_ctx)
509            .map(|resp| HttpResponseV1_1 {
510                status: resp.status,
511                headers: resp.headers,
512                body: resp.body,
513            })
514            .map_err(|err| HttpClientErrorV1_1 {
515                code: err.code,
516                message: err.message,
517            })
518    }
519}
520
521impl StateStoreHost for HostState {
522    fn read(
523        &mut self,
524        key: HostStateKey,
525        ctx: Option<StateTenantCtx>,
526    ) -> Result<Vec<u8>, StateError> {
527        let store = match self.state_store.as_ref() {
528            Some(store) => store.clone(),
529            None => {
530                return Err(StateError {
531                    code: "unavailable".into(),
532                    message: "state store not configured".into(),
533                });
534            }
535        };
536        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
537            Ok(ctx) => ctx,
538            Err(err) => {
539                return Err(StateError {
540                    code: "invalid-ctx".into(),
541                    message: err.to_string(),
542                });
543            }
544        };
545        #[cfg(feature = "fault-injection")]
546        {
547            let exec_ctx = self.exec_ctx.as_ref();
548            let flow_id = exec_ctx
549                .map(|ctx| ctx.flow_id.as_str())
550                .unwrap_or("unknown");
551            let node_id = exec_ctx.and_then(|ctx| ctx.node_id.as_deref());
552            let attempt = exec_ctx.map(|ctx| ctx.tenant.attempt).unwrap_or(1);
553            let fault_ctx = FaultContext {
554                pack_id: self.pack_id.as_str(),
555                flow_id,
556                node_id,
557                attempt,
558            };
559            if let Err(err) = maybe_fail(FaultPoint::StateRead, fault_ctx) {
560                return Err(StateError {
561                    code: "internal".into(),
562                    message: err.to_string(),
563                });
564            }
565        }
566        let key = StoreStateKey::from(key);
567        match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
568            Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
569            Ok(None) => Err(StateError {
570                code: "not_found".into(),
571                message: "state key not found".into(),
572            }),
573            Err(err) => Err(StateError {
574                code: "internal".into(),
575                message: err.to_string(),
576            }),
577        }
578    }
579
580    fn write(
581        &mut self,
582        key: HostStateKey,
583        bytes: Vec<u8>,
584        ctx: Option<StateTenantCtx>,
585    ) -> Result<StateOpAck, StateError> {
586        let store = match self.state_store.as_ref() {
587            Some(store) => store.clone(),
588            None => {
589                return Err(StateError {
590                    code: "unavailable".into(),
591                    message: "state store not configured".into(),
592                });
593            }
594        };
595        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
596            Ok(ctx) => ctx,
597            Err(err) => {
598                return Err(StateError {
599                    code: "invalid-ctx".into(),
600                    message: err.to_string(),
601                });
602            }
603        };
604        #[cfg(feature = "fault-injection")]
605        {
606            let exec_ctx = self.exec_ctx.as_ref();
607            let flow_id = exec_ctx
608                .map(|ctx| ctx.flow_id.as_str())
609                .unwrap_or("unknown");
610            let node_id = exec_ctx.and_then(|ctx| ctx.node_id.as_deref());
611            let attempt = exec_ctx.map(|ctx| ctx.tenant.attempt).unwrap_or(1);
612            let fault_ctx = FaultContext {
613                pack_id: self.pack_id.as_str(),
614                flow_id,
615                node_id,
616                attempt,
617            };
618            if let Err(err) = maybe_fail(FaultPoint::StateWrite, fault_ctx) {
619                return Err(StateError {
620                    code: "internal".into(),
621                    message: err.to_string(),
622                });
623            }
624        }
625        let key = StoreStateKey::from(key);
626        let value = serde_json::from_slice(&bytes)
627            .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
628        match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
629            Ok(()) => Ok(StateOpAck::Ok),
630            Err(err) => Err(StateError {
631                code: "internal".into(),
632                message: err.to_string(),
633            }),
634        }
635    }
636
637    fn delete(
638        &mut self,
639        key: HostStateKey,
640        ctx: Option<StateTenantCtx>,
641    ) -> Result<StateOpAck, StateError> {
642        let store = match self.state_store.as_ref() {
643            Some(store) => store.clone(),
644            None => {
645                return Err(StateError {
646                    code: "unavailable".into(),
647                    message: "state store not configured".into(),
648                });
649            }
650        };
651        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
652            Ok(ctx) => ctx,
653            Err(err) => {
654                return Err(StateError {
655                    code: "invalid-ctx".into(),
656                    message: err.to_string(),
657                });
658            }
659        };
660        let key = StoreStateKey::from(key);
661        match store.del(&tenant_ctx, STATE_PREFIX, &key) {
662            Ok(_) => Ok(StateOpAck::Ok),
663            Err(err) => Err(StateError {
664                code: "internal".into(),
665                message: err.to_string(),
666            }),
667        }
668    }
669}
670
671impl TelemetryLoggerHost for HostState {
672    fn log(
673        &mut self,
674        span: TelemetrySpanContext,
675        fields: Vec<(String, String)>,
676        _ctx: Option<TelemetryTenantCtx>,
677    ) -> Result<TelemetryAck, TelemetryError> {
678        if let Some(mock) = &self.mocks
679            && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
680        {
681            return Ok(TelemetryAck::Ok);
682        }
683        let mut map = serde_json::Map::new();
684        for (k, v) in fields {
685            map.insert(k, Value::String(v));
686        }
687        tracing::info!(
688            tenant = %span.tenant,
689            flow_id = %span.flow_id,
690            node = ?span.node_id,
691            provider = %span.provider,
692            fields = %serde_json::Value::Object(map.clone()),
693            "telemetry log from pack"
694        );
695        Ok(TelemetryAck::Ok)
696    }
697}
698
699impl RunnerHostHttp for HostState {
700    fn request(
701        &mut self,
702        method: String,
703        url: String,
704        headers: Vec<String>,
705        body: Option<Vec<u8>>,
706    ) -> Result<Vec<u8>, String> {
707        let req = HttpRequest {
708            method,
709            url,
710            headers: headers
711                .chunks(2)
712                .filter_map(|chunk| {
713                    if chunk.len() == 2 {
714                        Some((chunk[0].clone(), chunk[1].clone()))
715                    } else {
716                        None
717                    }
718                })
719                .collect(),
720            body,
721        };
722        match HttpClientHost::send(self, req, None) {
723            Ok(resp) => Ok(resp.body.unwrap_or_default()),
724            Err(err) => Err(err.message),
725        }
726    }
727}
728
729impl RunnerHostKv for HostState {
730    fn get(&mut self, _ns: String, _key: String) -> Option<String> {
731        None
732    }
733
734    fn put(&mut self, _ns: String, _key: String, _val: String) {}
735}
736
737enum ManifestLoad {
738    New {
739        manifest: Box<greentic_types::PackManifest>,
740        flows: PackFlows,
741    },
742    Legacy {
743        manifest: Box<legacy_pack::PackManifest>,
744        flows: PackFlows,
745    },
746}
747
748fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
749    let mut archive = ZipArchive::new(File::open(path)?)
750        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
751    let bytes = read_entry(&mut archive, "manifest.cbor")
752        .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
753    match decode_pack_manifest(&bytes) {
754        Ok(manifest) => {
755            let cache = PackFlows::from_manifest(manifest.clone());
756            Ok(ManifestLoad::New {
757                manifest: Box::new(manifest),
758                flows: cache,
759            })
760        }
761        Err(err) => {
762            tracing::debug!(error = %err, pack = %path.display(), "decode_pack_manifest failed; trying legacy manifest");
763            // Fall back to legacy pack manifest
764            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
765                .context("failed to decode legacy pack manifest from manifest.cbor")?;
766            let flows = load_legacy_flows(&mut archive, &legacy)?;
767            Ok(ManifestLoad::Legacy {
768                manifest: Box::new(legacy),
769                flows,
770            })
771        }
772    }
773}
774
775fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
776    let manifest_path = root.join("manifest.cbor");
777    let bytes = std::fs::read(&manifest_path)
778        .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
779    match decode_pack_manifest(&bytes) {
780        Ok(manifest) => {
781            let cache = PackFlows::from_manifest(manifest.clone());
782            Ok(ManifestLoad::New {
783                manifest: Box::new(manifest),
784                flows: cache,
785            })
786        }
787        Err(err) => {
788            tracing::debug!(
789                error = %err,
790                pack = %root.display(),
791                "decode_pack_manifest failed for materialized pack; trying legacy manifest"
792            );
793            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
794                .context("failed to decode legacy pack manifest from manifest.cbor")?;
795            let flows = load_legacy_flows_from_dir(root, &legacy)?;
796            Ok(ManifestLoad::Legacy {
797                manifest: Box::new(legacy),
798                flows,
799            })
800        }
801    }
802}
803
804fn load_legacy_flows(
805    archive: &mut ZipArchive<File>,
806    manifest: &legacy_pack::PackManifest,
807) -> Result<PackFlows> {
808    let mut flows = HashMap::new();
809    let mut descriptors = Vec::new();
810
811    for entry in &manifest.flows {
812        let bytes = read_entry(archive, &entry.file_json)
813            .with_context(|| format!("missing flow json {}", entry.file_json))?;
814        let doc: FlowDoc = serde_json::from_slice(&bytes)
815            .with_context(|| format!("failed to decode flow doc {}", entry.file_json))?;
816        let normalized = normalize_flow_doc(doc);
817        let flow_ir = flow_doc_to_ir(normalized)?;
818        let flow = flow_ir_to_flow(flow_ir)?;
819
820        descriptors.push(FlowDescriptor {
821            id: entry.id.clone(),
822            flow_type: entry.kind.clone(),
823            pack_id: manifest.meta.pack_id.clone(),
824            profile: manifest.meta.pack_id.clone(),
825            version: manifest.meta.version.to_string(),
826            description: None,
827        });
828        flows.insert(entry.id.clone(), flow);
829    }
830
831    let mut entry_flows = manifest.meta.entry_flows.clone();
832    if entry_flows.is_empty() {
833        entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
834    }
835    let metadata = PackMetadata {
836        pack_id: manifest.meta.pack_id.clone(),
837        version: manifest.meta.version.to_string(),
838        entry_flows,
839        secret_requirements: Vec::new(),
840    };
841
842    Ok(PackFlows {
843        descriptors,
844        flows,
845        metadata,
846    })
847}
848
849fn load_legacy_flows_from_dir(
850    root: &Path,
851    manifest: &legacy_pack::PackManifest,
852) -> Result<PackFlows> {
853    let mut flows = HashMap::new();
854    let mut descriptors = Vec::new();
855
856    for entry in &manifest.flows {
857        let path = root.join(&entry.file_json);
858        let bytes = std::fs::read(&path)
859            .with_context(|| format!("missing flow json {}", path.display()))?;
860        let doc: FlowDoc = serde_json::from_slice(&bytes)
861            .with_context(|| format!("failed to decode flow doc {}", path.display()))?;
862        let normalized = normalize_flow_doc(doc);
863        let flow_ir = flow_doc_to_ir(normalized)?;
864        let flow = flow_ir_to_flow(flow_ir)?;
865
866        descriptors.push(FlowDescriptor {
867            id: entry.id.clone(),
868            flow_type: entry.kind.clone(),
869            pack_id: manifest.meta.pack_id.clone(),
870            profile: manifest.meta.pack_id.clone(),
871            version: manifest.meta.version.to_string(),
872            description: None,
873        });
874        flows.insert(entry.id.clone(), flow);
875    }
876
877    let mut entry_flows = manifest.meta.entry_flows.clone();
878    if entry_flows.is_empty() {
879        entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
880    }
881    let metadata = PackMetadata {
882        pack_id: manifest.meta.pack_id.clone(),
883        version: manifest.meta.version.to_string(),
884        entry_flows,
885        secret_requirements: Vec::new(),
886    };
887
888    Ok(PackFlows {
889        descriptors,
890        flows,
891        metadata,
892    })
893}
894
895pub struct ComponentState {
896    pub host: HostState,
897    wasi_ctx: WasiCtx,
898    resource_table: ResourceTable,
899}
900
901impl ComponentState {
902    pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
903        let wasi_ctx = policy
904            .instantiate()
905            .context("failed to build WASI context")?;
906        Ok(Self {
907            host,
908            wasi_ctx,
909            resource_table: ResourceTable::new(),
910        })
911    }
912
913    fn host_mut(&mut self) -> &mut HostState {
914        &mut self.host
915    }
916
917    fn should_cancel_host(&mut self) -> bool {
918        false
919    }
920
921    fn yield_now_host(&mut self) {
922        // no-op cooperative yield
923    }
924}
925
926impl component_api::v0_4::greentic::component::control::Host for ComponentState {
927    fn should_cancel(&mut self) -> bool {
928        self.should_cancel_host()
929    }
930
931    fn yield_now(&mut self) {
932        self.yield_now_host();
933    }
934}
935
936impl component_api::v0_5::greentic::component::control::Host for ComponentState {
937    fn should_cancel(&mut self) -> bool {
938        self.should_cancel_host()
939    }
940
941    fn yield_now(&mut self) {
942        self.yield_now_host();
943    }
944}
945
946fn add_component_control_instance(
947    linker: &mut Linker<ComponentState>,
948    name: &str,
949) -> wasmtime::Result<()> {
950    let mut inst = linker.instance(name)?;
951    inst.func_wrap(
952        "should-cancel",
953        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
954            let host = caller.data_mut();
955            Ok((host.should_cancel_host(),))
956        },
957    )?;
958    inst.func_wrap(
959        "yield-now",
960        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
961            let host = caller.data_mut();
962            host.yield_now_host();
963            Ok(())
964        },
965    )?;
966    Ok(())
967}
968
969fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
970    add_component_control_instance(linker, "greentic:component/control@0.5.0")?;
971    add_component_control_instance(linker, "greentic:component/control@0.4.0")?;
972    Ok(())
973}
974
975pub fn register_all(linker: &mut Linker<ComponentState>, allow_state_store: bool) -> Result<()> {
976    add_wasi_to_linker(linker)?;
977    add_all_v1_to_linker(
978        linker,
979        HostFns {
980            http_client_v1_1: Some(|state| state.host_mut()),
981            http_client: Some(|state| state.host_mut()),
982            oauth_broker: None,
983            runner_host_http: Some(|state| state.host_mut()),
984            runner_host_kv: Some(|state| state.host_mut()),
985            telemetry_logger: Some(|state| state.host_mut()),
986            state_store: allow_state_store.then_some(|state| state.host_mut()),
987            secrets_store: Some(|state| state.host_mut()),
988        },
989    )?;
990    Ok(())
991}
992
993impl OAuthHostContext for ComponentState {
994    fn tenant_id(&self) -> &str {
995        &self.host.config.tenant
996    }
997
998    fn env(&self) -> &str {
999        &self.host.default_env
1000    }
1001
1002    fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
1003        &mut self.host.oauth_host
1004    }
1005
1006    fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
1007        self.host.oauth_config.as_ref()
1008    }
1009}
1010
1011impl WasiView for ComponentState {
1012    fn ctx(&mut self) -> WasiCtxView<'_> {
1013        WasiCtxView {
1014            ctx: &mut self.wasi_ctx,
1015            table: &mut self.resource_table,
1016        }
1017    }
1018}
1019
1020#[allow(unsafe_code)]
1021unsafe impl Send for ComponentState {}
1022#[allow(unsafe_code)]
1023unsafe impl Sync for ComponentState {}
1024
1025impl PackRuntime {
1026    fn allows_state_store(&self, component_ref: &str) -> bool {
1027        if self.state_store.is_none() {
1028            return false;
1029        }
1030        if !self.config.state_store_policy.allow {
1031            return false;
1032        }
1033        let Some(manifest) = self.component_manifests.get(component_ref) else {
1034            return false;
1035        };
1036        manifest
1037            .capabilities
1038            .host
1039            .state
1040            .as_ref()
1041            .map(|caps| caps.read || caps.write)
1042            .unwrap_or(false)
1043    }
1044
1045    #[allow(clippy::too_many_arguments)]
1046    pub async fn load(
1047        path: impl AsRef<Path>,
1048        config: Arc<HostConfig>,
1049        mocks: Option<Arc<MockLayer>>,
1050        archive_source: Option<&Path>,
1051        session_store: Option<DynSessionStore>,
1052        state_store: Option<DynStateStore>,
1053        wasi_policy: Arc<RunnerWasiPolicy>,
1054        secrets: DynSecretsManager,
1055        oauth_config: Option<OAuthBrokerConfig>,
1056        verify_archive: bool,
1057        component_resolution: ComponentResolution,
1058    ) -> Result<Self> {
1059        let path = path.as_ref();
1060        let (_pack_root, safe_path) = normalize_pack_path(path)?;
1061        let path_meta = std::fs::metadata(&safe_path).ok();
1062        let is_dir = path_meta
1063            .as_ref()
1064            .map(|meta| meta.is_dir())
1065            .unwrap_or(false);
1066        let is_component = !is_dir
1067            && safe_path
1068                .extension()
1069                .and_then(|ext| ext.to_str())
1070                .map(|ext| ext.eq_ignore_ascii_case("wasm"))
1071                .unwrap_or(false);
1072        let archive_hint_path = if let Some(source) = archive_source {
1073            let (_, normalized) = normalize_pack_path(source)?;
1074            Some(normalized)
1075        } else if is_component || is_dir {
1076            None
1077        } else {
1078            Some(safe_path.clone())
1079        };
1080        let archive_hint = archive_hint_path.as_deref();
1081        if verify_archive {
1082            if let Some(verify_target) = archive_hint.and_then(|p| {
1083                std::fs::metadata(p)
1084                    .ok()
1085                    .filter(|meta| meta.is_file())
1086                    .map(|_| p)
1087            }) {
1088                verify::verify_pack(verify_target).await?;
1089                tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
1090            } else {
1091                tracing::debug!("skipping archive verification (no archive source)");
1092            }
1093        }
1094        let engine = Engine::default();
1095        let engine_profile =
1096            EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1097        let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1098        let mut metadata = PackMetadata::fallback(&safe_path);
1099        let mut manifest = None;
1100        let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
1101        let mut flows = None;
1102        let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
1103            if is_dir {
1104                Some(safe_path.clone())
1105            } else {
1106                None
1107            }
1108        });
1109
1110        if let Some(root) = materialized_root.as_ref() {
1111            match load_manifest_and_flows_from_dir(root) {
1112                Ok(ManifestLoad::New {
1113                    manifest: m,
1114                    flows: cache,
1115                }) => {
1116                    metadata = cache.metadata.clone();
1117                    manifest = Some(*m);
1118                    flows = Some(cache);
1119                }
1120                Ok(ManifestLoad::Legacy {
1121                    manifest: m,
1122                    flows: cache,
1123                }) => {
1124                    metadata = cache.metadata.clone();
1125                    legacy_manifest = Some(m);
1126                    flows = Some(cache);
1127                }
1128                Err(err) => {
1129                    warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
1130                }
1131            }
1132        }
1133
1134        if manifest.is_none()
1135            && legacy_manifest.is_none()
1136            && let Some(archive_path) = archive_hint
1137        {
1138            match load_manifest_and_flows(archive_path) {
1139                Ok(ManifestLoad::New {
1140                    manifest: m,
1141                    flows: cache,
1142                }) => {
1143                    metadata = cache.metadata.clone();
1144                    manifest = Some(*m);
1145                    flows = Some(cache);
1146                }
1147                Ok(ManifestLoad::Legacy {
1148                    manifest: m,
1149                    flows: cache,
1150                }) => {
1151                    metadata = cache.metadata.clone();
1152                    legacy_manifest = Some(m);
1153                    flows = Some(cache);
1154                }
1155                Err(err) => {
1156                    warn!(error = %err, pack = %archive_path.display(), "failed to parse pack manifest; skipping flows");
1157                }
1158            }
1159        }
1160        #[cfg(feature = "fault-injection")]
1161        {
1162            let fault_ctx = FaultContext {
1163                pack_id: metadata.pack_id.as_str(),
1164                flow_id: "unknown",
1165                node_id: None,
1166                attempt: 1,
1167            };
1168            maybe_fail(FaultPoint::PackResolve, fault_ctx)
1169                .map_err(|err| anyhow!(err.to_string()))?;
1170        }
1171        let mut pack_lock = None;
1172        for root in find_pack_lock_roots(&safe_path, is_dir, archive_hint) {
1173            pack_lock = load_pack_lock(&root)?;
1174            if pack_lock.is_some() {
1175                break;
1176            }
1177        }
1178        let component_sources_payload = if pack_lock.is_none() {
1179            if let Some(manifest) = manifest.as_ref() {
1180                manifest
1181                    .get_component_sources_v1()
1182                    .context("invalid component sources extension")?
1183            } else {
1184                None
1185            }
1186        } else {
1187            None
1188        };
1189        let component_sources = if let Some(lock) = pack_lock.as_ref() {
1190            Some(component_sources_table_from_pack_lock(
1191                lock,
1192                component_resolution.allow_missing_hash,
1193            )?)
1194        } else {
1195            component_sources_table(component_sources_payload.as_ref())?
1196        };
1197        let components = if is_component {
1198            let wasm_bytes = fs::read(&safe_path).await?;
1199            metadata = PackMetadata::from_wasm(&wasm_bytes)
1200                .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1201            let name = safe_path
1202                .file_stem()
1203                .map(|s| s.to_string_lossy().to_string())
1204                .unwrap_or_else(|| "component".to_string());
1205            let component = compile_component_with_cache(&cache, &engine, None, wasm_bytes).await?;
1206            let mut map = HashMap::new();
1207            map.insert(
1208                name.clone(),
1209                PackComponent {
1210                    name,
1211                    version: metadata.version.clone(),
1212                    component,
1213                },
1214            );
1215            map
1216        } else {
1217            let specs = component_specs(
1218                manifest.as_ref(),
1219                legacy_manifest.as_deref(),
1220                component_sources_payload.as_ref(),
1221                pack_lock.as_ref(),
1222            );
1223            if specs.is_empty() {
1224                HashMap::new()
1225            } else {
1226                let mut loaded = HashMap::new();
1227                let mut missing: HashSet<String> =
1228                    specs.iter().map(|spec| spec.id.clone()).collect();
1229                let mut searched = Vec::new();
1230
1231                if !component_resolution.overrides.is_empty() {
1232                    load_components_from_overrides(
1233                        &cache,
1234                        &engine,
1235                        &component_resolution.overrides,
1236                        &specs,
1237                        &mut missing,
1238                        &mut loaded,
1239                    )
1240                    .await?;
1241                    searched.push("override map".to_string());
1242                }
1243
1244                if let Some(component_sources) = component_sources.as_ref() {
1245                    load_components_from_sources(
1246                        &cache,
1247                        &engine,
1248                        component_sources,
1249                        &component_resolution,
1250                        &specs,
1251                        &mut missing,
1252                        &mut loaded,
1253                        materialized_root.as_deref(),
1254                        archive_hint,
1255                    )
1256                    .await?;
1257                    searched.push(format!("extension {}", EXT_COMPONENT_SOURCES_V1));
1258                }
1259
1260                if let Some(root) = materialized_root.as_ref() {
1261                    load_components_from_dir(
1262                        &cache,
1263                        &engine,
1264                        root,
1265                        &specs,
1266                        &mut missing,
1267                        &mut loaded,
1268                    )
1269                    .await?;
1270                    searched.push(format!("components dir {}", root.display()));
1271                }
1272
1273                if let Some(archive_path) = archive_hint {
1274                    load_components_from_archive(
1275                        &cache,
1276                        &engine,
1277                        archive_path,
1278                        &specs,
1279                        &mut missing,
1280                        &mut loaded,
1281                    )
1282                    .await?;
1283                    searched.push(format!("archive {}", archive_path.display()));
1284                }
1285
1286                if !missing.is_empty() {
1287                    let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
1288                    let sources = if searched.is_empty() {
1289                        "no component sources".to_string()
1290                    } else {
1291                        searched.join(", ")
1292                    };
1293                    bail!(
1294                        "components missing: {}; looked in {}",
1295                        missing_list,
1296                        sources
1297                    );
1298                }
1299
1300                loaded
1301            }
1302        };
1303        let http_client = Arc::clone(&HTTP_CLIENT);
1304        let mut component_manifests = HashMap::new();
1305        if let Some(manifest) = manifest.as_ref() {
1306            for component in &manifest.components {
1307                component_manifests.insert(component.id.as_str().to_string(), component.clone());
1308            }
1309        }
1310        Ok(Self {
1311            path: safe_path,
1312            archive_path: archive_hint.map(Path::to_path_buf),
1313            config,
1314            engine,
1315            metadata,
1316            manifest,
1317            legacy_manifest,
1318            component_manifests,
1319            mocks,
1320            flows,
1321            components,
1322            http_client,
1323            pre_cache: Mutex::new(HashMap::new()),
1324            session_store,
1325            state_store,
1326            wasi_policy,
1327            provider_registry: RwLock::new(None),
1328            secrets,
1329            oauth_config,
1330            cache,
1331        })
1332    }
1333
1334    pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
1335        if let Some(cache) = &self.flows {
1336            return Ok(cache.descriptors.clone());
1337        }
1338        if let Some(manifest) = &self.manifest {
1339            let descriptors = manifest
1340                .flows
1341                .iter()
1342                .map(|flow| FlowDescriptor {
1343                    id: flow.id.as_str().to_string(),
1344                    flow_type: flow_kind_to_str(flow.kind).to_string(),
1345                    pack_id: manifest.pack_id.as_str().to_string(),
1346                    profile: manifest.pack_id.as_str().to_string(),
1347                    version: manifest.version.to_string(),
1348                    description: None,
1349                })
1350                .collect();
1351            return Ok(descriptors);
1352        }
1353        Ok(Vec::new())
1354    }
1355
1356    #[allow(dead_code)]
1357    pub async fn run_flow(
1358        &self,
1359        flow_id: &str,
1360        input: serde_json::Value,
1361    ) -> Result<serde_json::Value> {
1362        let pack = Arc::new(
1363            PackRuntime::load(
1364                &self.path,
1365                Arc::clone(&self.config),
1366                self.mocks.clone(),
1367                self.archive_path.as_deref(),
1368                self.session_store.clone(),
1369                self.state_store.clone(),
1370                Arc::clone(&self.wasi_policy),
1371                self.secrets.clone(),
1372                self.oauth_config.clone(),
1373                false,
1374                ComponentResolution::default(),
1375            )
1376            .await?,
1377        );
1378
1379        let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1380        let retry_config = self.config.retry_config().into();
1381        let mocks = pack.mocks.as_deref();
1382        let tenant = self.config.tenant.as_str();
1383
1384        let ctx = FlowContext {
1385            tenant,
1386            pack_id: pack.metadata().pack_id.as_str(),
1387            flow_id,
1388            node_id: None,
1389            tool: None,
1390            action: None,
1391            session_id: None,
1392            provider_id: None,
1393            retry_config,
1394            attempt: 1,
1395            observer: None,
1396            mocks,
1397        };
1398
1399        let execution = engine.execute(ctx, input).await?;
1400        match execution.status {
1401            FlowStatus::Completed => Ok(execution.output),
1402            FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1403                "status": "pending",
1404                "reason": wait.reason,
1405                "resume": wait.snapshot,
1406                "response": execution.output,
1407            })),
1408        }
1409    }
1410
1411    pub async fn invoke_component(
1412        &self,
1413        component_ref: &str,
1414        ctx: ComponentExecCtx,
1415        operation: &str,
1416        _config_json: Option<String>,
1417        input_json: String,
1418    ) -> Result<Value> {
1419        let pack_component = self
1420            .components
1421            .get(component_ref)
1422            .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1423
1424        let mut linker = Linker::new(&self.engine);
1425        let allow_state_store = self.allows_state_store(component_ref);
1426        register_all(&mut linker, allow_state_store)?;
1427        add_component_control_to_linker(&mut linker)?;
1428
1429        let host_state = HostState::new(
1430            self.metadata().pack_id.clone(),
1431            Arc::clone(&self.config),
1432            Arc::clone(&self.http_client),
1433            self.mocks.clone(),
1434            self.session_store.clone(),
1435            self.state_store.clone(),
1436            Arc::clone(&self.secrets),
1437            self.oauth_config.clone(),
1438            Some(ctx.clone()),
1439        )?;
1440        let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1441        let mut store = wasmtime::Store::new(&self.engine, store_state);
1442        let pre_instance = linker.instantiate_pre(pack_component.component.as_ref())?;
1443        let result = match component_api::v0_5::ComponentPre::new(pre_instance) {
1444            Ok(pre) => {
1445                let bindings = pre.instantiate_async(&mut store).await?;
1446                let node = bindings.greentic_component_node();
1447                let ctx_v05 = component_api::exec_ctx_v0_5(&ctx);
1448                let result = node.call_invoke(&mut store, &ctx_v05, operation, &input_json)?;
1449                component_api::invoke_result_from_v0_5(result)
1450            }
1451            Err(err) => {
1452                if is_missing_node_export(&err, "0.5.0") {
1453                    let pre_instance = linker.instantiate_pre(pack_component.component.as_ref())?;
1454                    let pre: component_api::v0_4::ComponentPre<ComponentState> =
1455                        component_api::v0_4::ComponentPre::new(pre_instance)?;
1456                    let bindings = pre.instantiate_async(&mut store).await?;
1457                    let node = bindings.greentic_component_node();
1458                    let ctx_v04 = component_api::exec_ctx_v0_4(&ctx);
1459                    let result = node.call_invoke(&mut store, &ctx_v04, operation, &input_json)?;
1460                    component_api::invoke_result_from_v0_4(result)
1461                } else {
1462                    return Err(err);
1463                }
1464            }
1465        };
1466
1467        match result {
1468            InvokeResult::Ok(body) => {
1469                if body.is_empty() {
1470                    return Ok(Value::Null);
1471                }
1472                serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
1473            }
1474            InvokeResult::Err(NodeError {
1475                code,
1476                message,
1477                retryable,
1478                backoff_ms,
1479                details,
1480            }) => {
1481                let mut obj = serde_json::Map::new();
1482                obj.insert("ok".into(), Value::Bool(false));
1483                let mut error = serde_json::Map::new();
1484                error.insert("code".into(), Value::String(code));
1485                error.insert("message".into(), Value::String(message));
1486                error.insert("retryable".into(), Value::Bool(retryable));
1487                if let Some(backoff) = backoff_ms {
1488                    error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1489                }
1490                if let Some(details) = details {
1491                    error.insert(
1492                        "details".into(),
1493                        serde_json::from_str(&details).unwrap_or(Value::String(details)),
1494                    );
1495                }
1496                obj.insert("error".into(), Value::Object(error));
1497                Ok(Value::Object(obj))
1498            }
1499        }
1500    }
1501
1502    pub fn resolve_provider(
1503        &self,
1504        provider_id: Option<&str>,
1505        provider_type: Option<&str>,
1506    ) -> Result<ProviderBinding> {
1507        let registry = self.provider_registry()?;
1508        registry.resolve(provider_id, provider_type)
1509    }
1510
1511    pub async fn invoke_provider(
1512        &self,
1513        binding: &ProviderBinding,
1514        ctx: ComponentExecCtx,
1515        op: &str,
1516        input_json: Vec<u8>,
1517    ) -> Result<Value> {
1518        let component_ref = &binding.component_ref;
1519        let pack_component = self
1520            .components
1521            .get(component_ref)
1522            .with_context(|| format!("provider component '{component_ref}' not found in pack"))?;
1523
1524        let mut linker = Linker::new(&self.engine);
1525        let allow_state_store = self.allows_state_store(component_ref);
1526        register_all(&mut linker, allow_state_store)?;
1527        add_component_control_to_linker(&mut linker)?;
1528        let pre_instance = linker.instantiate_pre(pack_component.component.as_ref())?;
1529        let pre: ProviderComponentPre<ComponentState> = ProviderComponentPre::new(pre_instance)?;
1530
1531        let host_state = HostState::new(
1532            self.metadata().pack_id.clone(),
1533            Arc::clone(&self.config),
1534            Arc::clone(&self.http_client),
1535            self.mocks.clone(),
1536            self.session_store.clone(),
1537            self.state_store.clone(),
1538            Arc::clone(&self.secrets),
1539            self.oauth_config.clone(),
1540            Some(ctx),
1541        )?;
1542        let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1543        let mut store = wasmtime::Store::new(&self.engine, store_state);
1544        let bindings: crate::provider_core::SchemaCore = pre.instantiate_async(&mut store).await?;
1545        let provider = bindings.greentic_provider_core_schema_core_api();
1546
1547        let result = provider.call_invoke(&mut store, op, &input_json)?;
1548        deserialize_json_bytes(result)
1549    }
1550
1551    fn provider_registry(&self) -> Result<ProviderRegistry> {
1552        if let Some(registry) = self.provider_registry.read().clone() {
1553            return Ok(registry);
1554        }
1555        let manifest = self
1556            .manifest
1557            .as_ref()
1558            .context("pack manifest required for provider resolution")?;
1559        let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1560        let registry = ProviderRegistry::new(
1561            manifest,
1562            self.state_store.clone(),
1563            &self.config.tenant,
1564            &env,
1565        )?;
1566        *self.provider_registry.write() = Some(registry.clone());
1567        Ok(registry)
1568    }
1569
1570    pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1571        if let Some(cache) = &self.flows {
1572            return cache
1573                .flows
1574                .get(flow_id)
1575                .cloned()
1576                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1577        }
1578        if let Some(manifest) = &self.manifest {
1579            let entry = manifest
1580                .flows
1581                .iter()
1582                .find(|f| f.id.as_str() == flow_id)
1583                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1584            return Ok(entry.flow.clone());
1585        }
1586        bail!("flow '{flow_id}' not available (pack exports disabled)")
1587    }
1588
1589    pub fn metadata(&self) -> &PackMetadata {
1590        &self.metadata
1591    }
1592
1593    pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1594        &self.metadata.secret_requirements
1595    }
1596
1597    pub fn missing_secrets(
1598        &self,
1599        tenant_ctx: &TypesTenantCtx,
1600    ) -> Vec<greentic_types::SecretRequirement> {
1601        let env = tenant_ctx.env.as_str().to_string();
1602        let tenant = tenant_ctx.tenant.as_str().to_string();
1603        let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1604        self.required_secrets()
1605            .iter()
1606            .filter(|req| {
1607                // scope must match current context if provided
1608                if let Some(scope) = &req.scope {
1609                    if scope.env != env {
1610                        return false;
1611                    }
1612                    if scope.tenant != tenant {
1613                        return false;
1614                    }
1615                    if let Some(ref team_req) = scope.team
1616                        && team.as_ref() != Some(team_req)
1617                    {
1618                        return false;
1619                    }
1620                }
1621                let ctx = self.config.tenant_ctx();
1622                read_secret_blocking(&self.secrets, &ctx, req.key.as_str()).is_err()
1623            })
1624            .cloned()
1625            .collect()
1626    }
1627
1628    pub fn for_component_test(
1629        components: Vec<(String, PathBuf)>,
1630        flows: HashMap<String, FlowIR>,
1631        pack_id: &str,
1632        config: Arc<HostConfig>,
1633    ) -> Result<Self> {
1634        let engine = Engine::default();
1635        let engine_profile =
1636            EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1637        let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1638        let mut component_map = HashMap::new();
1639        for (name, path) in components {
1640            if !path.exists() {
1641                bail!("component artifact missing: {}", path.display());
1642            }
1643            let wasm_bytes = std::fs::read(&path)?;
1644            let component = Arc::new(
1645                Component::from_binary(&engine, &wasm_bytes)
1646                    .with_context(|| format!("failed to compile component {}", path.display()))?,
1647            );
1648            component_map.insert(
1649                name.clone(),
1650                PackComponent {
1651                    name,
1652                    version: "0.0.0".into(),
1653                    component,
1654                },
1655            );
1656        }
1657
1658        let mut flow_map = HashMap::new();
1659        let mut descriptors = Vec::new();
1660        for (id, ir) in flows {
1661            let flow_type = ir.flow_type.clone();
1662            let flow = flow_ir_to_flow(ir)?;
1663            flow_map.insert(id.clone(), flow);
1664            descriptors.push(FlowDescriptor {
1665                id: id.clone(),
1666                flow_type,
1667                pack_id: pack_id.to_string(),
1668                profile: "test".into(),
1669                version: "0.0.0".into(),
1670                description: None,
1671            });
1672        }
1673        let entry_flows = descriptors.iter().map(|flow| flow.id.clone()).collect();
1674        let metadata = PackMetadata {
1675            pack_id: pack_id.to_string(),
1676            version: "0.0.0".into(),
1677            entry_flows,
1678            secret_requirements: Vec::new(),
1679        };
1680        let flows_cache = PackFlows {
1681            descriptors: descriptors.clone(),
1682            flows: flow_map,
1683            metadata: metadata.clone(),
1684        };
1685
1686        Ok(Self {
1687            path: PathBuf::new(),
1688            archive_path: None,
1689            config,
1690            engine,
1691            metadata,
1692            manifest: None,
1693            legacy_manifest: None,
1694            component_manifests: HashMap::new(),
1695            mocks: None,
1696            flows: Some(flows_cache),
1697            components: component_map,
1698            http_client: Arc::clone(&HTTP_CLIENT),
1699            pre_cache: Mutex::new(HashMap::new()),
1700            session_store: None,
1701            state_store: None,
1702            wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1703            provider_registry: RwLock::new(None),
1704            secrets: crate::secrets::default_manager()?,
1705            oauth_config: None,
1706            cache,
1707        })
1708    }
1709}
1710
1711fn is_missing_node_export(err: &wasmtime::Error, version: &str) -> bool {
1712    let message = err.to_string();
1713    message.contains("no exported instance named")
1714        && message.contains(&format!("greentic:component/node@{version}"))
1715}
1716
1717struct PackFlows {
1718    descriptors: Vec<FlowDescriptor>,
1719    flows: HashMap<String, Flow>,
1720    metadata: PackMetadata,
1721}
1722
1723const RUNTIME_FLOW_EXTENSION_IDS: [&str; 3] = [
1724    "greentic.pack.runtime_flow",
1725    "greentic.pack.flow_runtime",
1726    "greentic.pack.runtime_flows",
1727];
1728
1729#[derive(Debug, Deserialize)]
1730struct RuntimeFlowBundle {
1731    flows: Vec<RuntimeFlow>,
1732}
1733
1734#[derive(Debug, Deserialize)]
1735struct RuntimeFlow {
1736    id: String,
1737    #[serde(alias = "flow_type")]
1738    kind: FlowKind,
1739    #[serde(default)]
1740    schema_version: Option<String>,
1741    #[serde(default)]
1742    start: Option<String>,
1743    #[serde(default)]
1744    entrypoints: BTreeMap<String, Value>,
1745    nodes: BTreeMap<String, RuntimeNode>,
1746    #[serde(default)]
1747    metadata: Option<FlowMetadata>,
1748}
1749
1750#[derive(Debug, Deserialize)]
1751struct RuntimeNode {
1752    #[serde(alias = "component")]
1753    component_id: String,
1754    #[serde(default, alias = "operation")]
1755    operation_name: Option<String>,
1756    #[serde(default, alias = "payload", alias = "input")]
1757    operation_payload: Value,
1758    #[serde(default)]
1759    routing: Option<Routing>,
1760    #[serde(default)]
1761    telemetry: Option<TelemetryHints>,
1762}
1763
1764fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
1765    if bytes.is_empty() {
1766        return Ok(Value::Null);
1767    }
1768    serde_json::from_slice(&bytes).or_else(|_| {
1769        String::from_utf8(bytes)
1770            .map(Value::String)
1771            .map_err(|err| anyhow!(err))
1772    })
1773}
1774
1775impl PackFlows {
1776    fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
1777        if let Some(flows) = flows_from_runtime_extension(&manifest) {
1778            return flows;
1779        }
1780        let descriptors = manifest
1781            .flows
1782            .iter()
1783            .map(|entry| FlowDescriptor {
1784                id: entry.id.as_str().to_string(),
1785                flow_type: flow_kind_to_str(entry.kind).to_string(),
1786                pack_id: manifest.pack_id.as_str().to_string(),
1787                profile: manifest.pack_id.as_str().to_string(),
1788                version: manifest.version.to_string(),
1789                description: None,
1790            })
1791            .collect();
1792        let mut flows = HashMap::new();
1793        for entry in &manifest.flows {
1794            flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
1795        }
1796        Self {
1797            metadata: PackMetadata::from_manifest(&manifest),
1798            descriptors,
1799            flows,
1800        }
1801    }
1802}
1803
1804fn flows_from_runtime_extension(manifest: &greentic_types::PackManifest) -> Option<PackFlows> {
1805    let extensions = manifest.extensions.as_ref()?;
1806    let extension = extensions.iter().find_map(|(key, ext)| {
1807        if RUNTIME_FLOW_EXTENSION_IDS
1808            .iter()
1809            .any(|candidate| candidate == key)
1810        {
1811            Some(ext)
1812        } else {
1813            None
1814        }
1815    })?;
1816    let runtime_flows = match decode_runtime_flow_extension(extension) {
1817        Some(flows) if !flows.is_empty() => flows,
1818        _ => return None,
1819    };
1820
1821    let descriptors = runtime_flows
1822        .iter()
1823        .map(|flow| FlowDescriptor {
1824            id: flow.id.as_str().to_string(),
1825            flow_type: flow_kind_to_str(flow.kind).to_string(),
1826            pack_id: manifest.pack_id.as_str().to_string(),
1827            profile: manifest.pack_id.as_str().to_string(),
1828            version: manifest.version.to_string(),
1829            description: None,
1830        })
1831        .collect::<Vec<_>>();
1832    let flows = runtime_flows
1833        .into_iter()
1834        .map(|flow| (flow.id.as_str().to_string(), flow))
1835        .collect();
1836
1837    Some(PackFlows {
1838        metadata: PackMetadata::from_manifest(manifest),
1839        descriptors,
1840        flows,
1841    })
1842}
1843
1844fn decode_runtime_flow_extension(extension: &ExtensionRef) -> Option<Vec<Flow>> {
1845    let value = match extension.inline.as_ref()? {
1846        ExtensionInline::Other(value) => value.clone(),
1847        _ => return None,
1848    };
1849
1850    if let Ok(bundle) = serde_json::from_value::<RuntimeFlowBundle>(value.clone()) {
1851        return Some(collect_runtime_flows(bundle.flows));
1852    }
1853
1854    if let Ok(flows) = serde_json::from_value::<Vec<RuntimeFlow>>(value.clone()) {
1855        return Some(collect_runtime_flows(flows));
1856    }
1857
1858    if let Ok(flows) = serde_json::from_value::<Vec<Flow>>(value) {
1859        return Some(flows);
1860    }
1861
1862    warn!(
1863        extension = %extension.kind,
1864        version = %extension.version,
1865        "runtime flow extension present but could not be decoded"
1866    );
1867    None
1868}
1869
1870fn collect_runtime_flows(flows: Vec<RuntimeFlow>) -> Vec<Flow> {
1871    flows
1872        .into_iter()
1873        .filter_map(|flow| match runtime_flow_to_flow(flow) {
1874            Ok(flow) => Some(flow),
1875            Err(err) => {
1876                warn!(error = %err, "failed to decode runtime flow");
1877                None
1878            }
1879        })
1880        .collect()
1881}
1882
1883fn runtime_flow_to_flow(runtime: RuntimeFlow) -> Result<Flow> {
1884    let flow_id = FlowId::from_str(&runtime.id)
1885        .with_context(|| format!("invalid flow id `{}`", runtime.id))?;
1886    let mut entrypoints = runtime.entrypoints;
1887    if entrypoints.is_empty()
1888        && let Some(start) = &runtime.start
1889    {
1890        entrypoints.insert("default".into(), Value::String(start.clone()));
1891    }
1892
1893    let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
1894    for (id, node) in runtime.nodes {
1895        let node_id = NodeId::from_str(&id).with_context(|| format!("invalid node id `{id}`"))?;
1896        let component_id = ComponentId::from_str(&node.component_id)
1897            .with_context(|| format!("invalid component id `{}`", node.component_id))?;
1898        let component = FlowComponentRef {
1899            id: component_id,
1900            pack_alias: None,
1901            operation: node.operation_name,
1902        };
1903        let routing = node.routing.unwrap_or(Routing::End);
1904        let telemetry = node.telemetry.unwrap_or_default();
1905        nodes.insert(
1906            node_id.clone(),
1907            Node {
1908                id: node_id,
1909                component,
1910                input: InputMapping {
1911                    mapping: node.operation_payload,
1912                },
1913                output: OutputMapping {
1914                    mapping: Value::Null,
1915                },
1916                routing,
1917                telemetry,
1918            },
1919        );
1920    }
1921
1922    Ok(Flow {
1923        schema_version: runtime.schema_version.unwrap_or_else(|| "1.0".to_string()),
1924        id: flow_id,
1925        kind: runtime.kind,
1926        entrypoints,
1927        nodes,
1928        metadata: runtime.metadata.unwrap_or_default(),
1929    })
1930}
1931
1932fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
1933    match kind {
1934        greentic_types::FlowKind::Messaging => "messaging",
1935        greentic_types::FlowKind::Event => "event",
1936        greentic_types::FlowKind::ComponentConfig => "component-config",
1937        greentic_types::FlowKind::Job => "job",
1938        greentic_types::FlowKind::Http => "http",
1939    }
1940}
1941
1942fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1943    let mut file = archive
1944        .by_name(name)
1945        .with_context(|| format!("entry {name} missing from archive"))?;
1946    let mut buf = Vec::new();
1947    file.read_to_end(&mut buf)?;
1948    Ok(buf)
1949}
1950
1951fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
1952    for node in doc.nodes.values_mut() {
1953        let Some((component_ref, payload)) = node
1954            .raw
1955            .iter()
1956            .next()
1957            .map(|(key, value)| (key.clone(), value.clone()))
1958        else {
1959            continue;
1960        };
1961        if component_ref.starts_with("emit.") {
1962            node.operation = Some(component_ref);
1963            node.payload = payload;
1964            node.raw.clear();
1965            continue;
1966        }
1967        let (target_component, operation, input, config) =
1968            infer_component_exec(&payload, &component_ref);
1969        let mut payload_obj = serde_json::Map::new();
1970        // component.exec is meta; ensure the payload carries the actual target component.
1971        payload_obj.insert("component".into(), Value::String(target_component));
1972        payload_obj.insert("operation".into(), Value::String(operation));
1973        payload_obj.insert("input".into(), input);
1974        if let Some(cfg) = config {
1975            payload_obj.insert("config".into(), cfg);
1976        }
1977        node.operation = Some("component.exec".to_string());
1978        node.payload = Value::Object(payload_obj);
1979        node.raw.clear();
1980    }
1981    doc
1982}
1983
1984fn infer_component_exec(
1985    payload: &Value,
1986    component_ref: &str,
1987) -> (String, String, Value, Option<Value>) {
1988    let default_op = if component_ref.starts_with("templating.") {
1989        "render"
1990    } else {
1991        "invoke"
1992    }
1993    .to_string();
1994
1995    if let Value::Object(map) = payload {
1996        let op = map
1997            .get("op")
1998            .or_else(|| map.get("operation"))
1999            .and_then(Value::as_str)
2000            .map(|s| s.to_string())
2001            .unwrap_or_else(|| default_op.clone());
2002
2003        let mut input = map.clone();
2004        let config = input.remove("config");
2005        let component = input
2006            .get("component")
2007            .or_else(|| input.get("component_ref"))
2008            .and_then(Value::as_str)
2009            .map(|s| s.to_string())
2010            .unwrap_or_else(|| component_ref.to_string());
2011        input.remove("component");
2012        input.remove("component_ref");
2013        input.remove("op");
2014        input.remove("operation");
2015        return (component, op, Value::Object(input), config);
2016    }
2017
2018    (component_ref.to_string(), default_op, payload.clone(), None)
2019}
2020
2021#[derive(Clone, Debug)]
2022struct ComponentSpec {
2023    id: String,
2024    version: String,
2025    legacy_path: Option<String>,
2026}
2027
2028#[derive(Clone, Debug)]
2029struct ComponentSourceInfo {
2030    digest: Option<String>,
2031    source: ComponentSourceRef,
2032    artifact: ComponentArtifactLocation,
2033    expected_wasm_sha256: Option<String>,
2034    skip_digest_verification: bool,
2035}
2036
2037#[derive(Clone, Debug)]
2038enum ComponentArtifactLocation {
2039    Inline { wasm_path: String },
2040    Remote,
2041}
2042
2043#[derive(Clone, Debug, Deserialize)]
2044struct PackLockV1 {
2045    schema_version: u32,
2046    components: Vec<PackLockComponent>,
2047}
2048
2049#[derive(Clone, Debug, Deserialize)]
2050struct PackLockComponent {
2051    name: String,
2052    #[serde(default, rename = "source_ref")]
2053    source_ref: Option<String>,
2054    #[serde(default, rename = "ref")]
2055    legacy_ref: Option<String>,
2056    #[serde(default)]
2057    component_id: Option<ComponentId>,
2058    #[serde(default)]
2059    bundled: Option<bool>,
2060    #[serde(default, rename = "bundled_path")]
2061    bundled_path: Option<String>,
2062    #[serde(default, rename = "path")]
2063    legacy_path: Option<String>,
2064    #[serde(default)]
2065    wasm_sha256: Option<String>,
2066    #[serde(default, rename = "sha256")]
2067    legacy_sha256: Option<String>,
2068    #[serde(default)]
2069    resolved_digest: Option<String>,
2070    #[serde(default)]
2071    digest: Option<String>,
2072}
2073
2074fn component_specs(
2075    manifest: Option<&greentic_types::PackManifest>,
2076    legacy_manifest: Option<&legacy_pack::PackManifest>,
2077    component_sources: Option<&ComponentSourcesV1>,
2078    pack_lock: Option<&PackLockV1>,
2079) -> Vec<ComponentSpec> {
2080    if let Some(manifest) = manifest {
2081        if !manifest.components.is_empty() {
2082            return manifest
2083                .components
2084                .iter()
2085                .map(|entry| ComponentSpec {
2086                    id: entry.id.as_str().to_string(),
2087                    version: entry.version.to_string(),
2088                    legacy_path: None,
2089                })
2090                .collect();
2091        }
2092        if let Some(lock) = pack_lock {
2093            let mut seen = HashSet::new();
2094            let mut specs = Vec::new();
2095            for entry in &lock.components {
2096                let id = entry
2097                    .component_id
2098                    .as_ref()
2099                    .map(|id| id.as_str())
2100                    .unwrap_or(entry.name.as_str());
2101                if seen.insert(id.to_string()) {
2102                    specs.push(ComponentSpec {
2103                        id: id.to_string(),
2104                        version: "0.0.0".to_string(),
2105                        legacy_path: None,
2106                    });
2107                }
2108            }
2109            return specs;
2110        }
2111        if let Some(sources) = component_sources {
2112            let mut seen = HashSet::new();
2113            let mut specs = Vec::new();
2114            for entry in &sources.components {
2115                let id = entry
2116                    .component_id
2117                    .as_ref()
2118                    .map(|id| id.as_str())
2119                    .unwrap_or(entry.name.as_str());
2120                if seen.insert(id.to_string()) {
2121                    specs.push(ComponentSpec {
2122                        id: id.to_string(),
2123                        version: "0.0.0".to_string(),
2124                        legacy_path: None,
2125                    });
2126                }
2127            }
2128            return specs;
2129        }
2130    }
2131    if let Some(legacy_manifest) = legacy_manifest {
2132        return legacy_manifest
2133            .components
2134            .iter()
2135            .map(|entry| ComponentSpec {
2136                id: entry.name.clone(),
2137                version: entry.version.to_string(),
2138                legacy_path: Some(entry.file_wasm.clone()),
2139            })
2140            .collect();
2141    }
2142    Vec::new()
2143}
2144
2145fn component_sources_table(
2146    sources: Option<&ComponentSourcesV1>,
2147) -> Result<Option<HashMap<String, ComponentSourceInfo>>> {
2148    let Some(sources) = sources else {
2149        return Ok(None);
2150    };
2151    let mut table = HashMap::new();
2152    for entry in &sources.components {
2153        let artifact = match &entry.artifact {
2154            ArtifactLocationV1::Inline { wasm_path, .. } => ComponentArtifactLocation::Inline {
2155                wasm_path: wasm_path.clone(),
2156            },
2157            ArtifactLocationV1::Remote => ComponentArtifactLocation::Remote,
2158        };
2159        let info = ComponentSourceInfo {
2160            digest: Some(entry.resolved.digest.clone()),
2161            source: entry.source.clone(),
2162            artifact,
2163            expected_wasm_sha256: None,
2164            skip_digest_verification: false,
2165        };
2166        if let Some(component_id) = entry.component_id.as_ref() {
2167            table.insert(component_id.as_str().to_string(), info.clone());
2168        }
2169        table.insert(entry.name.clone(), info);
2170    }
2171    Ok(Some(table))
2172}
2173
2174fn load_pack_lock(path: &Path) -> Result<Option<PackLockV1>> {
2175    let lock_path = if path.is_dir() {
2176        let candidate = path.join("pack.lock");
2177        if candidate.exists() {
2178            Some(candidate)
2179        } else {
2180            let candidate = path.join("pack.lock.json");
2181            candidate.exists().then_some(candidate)
2182        }
2183    } else {
2184        None
2185    };
2186    let Some(lock_path) = lock_path else {
2187        return Ok(None);
2188    };
2189    let raw = std::fs::read_to_string(&lock_path)
2190        .with_context(|| format!("failed to read {}", lock_path.display()))?;
2191    let lock: PackLockV1 = serde_json::from_str(&raw).context("failed to parse pack.lock")?;
2192    if lock.schema_version != 1 {
2193        bail!("pack.lock schema_version must be 1");
2194    }
2195    Ok(Some(lock))
2196}
2197
2198fn find_pack_lock_roots(
2199    pack_path: &Path,
2200    is_dir: bool,
2201    archive_hint: Option<&Path>,
2202) -> Vec<PathBuf> {
2203    if is_dir {
2204        return vec![pack_path.to_path_buf()];
2205    }
2206    let mut roots = Vec::new();
2207    if let Some(archive_path) = archive_hint {
2208        if let Some(parent) = archive_path.parent() {
2209            roots.push(parent.to_path_buf());
2210            if let Some(grandparent) = parent.parent() {
2211                roots.push(grandparent.to_path_buf());
2212            }
2213        }
2214    } else if let Some(parent) = pack_path.parent() {
2215        roots.push(parent.to_path_buf());
2216        if let Some(grandparent) = parent.parent() {
2217            roots.push(grandparent.to_path_buf());
2218        }
2219    }
2220    roots
2221}
2222
2223fn normalize_sha256(digest: &str) -> Result<String> {
2224    let trimmed = digest.trim();
2225    if trimmed.is_empty() {
2226        bail!("sha256 digest cannot be empty");
2227    }
2228    if let Some(stripped) = trimmed.strip_prefix("sha256:") {
2229        if stripped.is_empty() {
2230            bail!("sha256 digest must include hex bytes after sha256:");
2231        }
2232        return Ok(trimmed.to_string());
2233    }
2234    if trimmed.chars().all(|c| c.is_ascii_hexdigit()) {
2235        return Ok(format!("sha256:{trimmed}"));
2236    }
2237    bail!("sha256 digest must be hex or sha256:<hex>");
2238}
2239
2240fn component_sources_table_from_pack_lock(
2241    lock: &PackLockV1,
2242    allow_missing_hash: bool,
2243) -> Result<HashMap<String, ComponentSourceInfo>> {
2244    let mut table = HashMap::new();
2245    let mut names = HashSet::new();
2246    for entry in &lock.components {
2247        if !names.insert(entry.name.clone()) {
2248            bail!(
2249                "pack.lock contains duplicate component name `{}`",
2250                entry.name
2251            );
2252        }
2253        let source_ref = match (&entry.source_ref, &entry.legacy_ref) {
2254            (Some(primary), Some(legacy)) => {
2255                if primary != legacy {
2256                    bail!(
2257                        "pack.lock component {} has conflicting refs: {} vs {}",
2258                        entry.name,
2259                        primary,
2260                        legacy
2261                    );
2262                }
2263                primary.as_str()
2264            }
2265            (Some(primary), None) => primary.as_str(),
2266            (None, Some(legacy)) => legacy.as_str(),
2267            (None, None) => {
2268                bail!("pack.lock component {} missing source_ref", entry.name);
2269            }
2270        };
2271        let source: ComponentSourceRef = source_ref
2272            .parse()
2273            .with_context(|| format!("invalid component ref `{}`", source_ref))?;
2274        let bundled_path = match (&entry.bundled_path, &entry.legacy_path) {
2275            (Some(primary), Some(legacy)) => {
2276                if primary != legacy {
2277                    bail!(
2278                        "pack.lock component {} has conflicting bundled paths: {} vs {}",
2279                        entry.name,
2280                        primary,
2281                        legacy
2282                    );
2283                }
2284                Some(primary.clone())
2285            }
2286            (Some(primary), None) => Some(primary.clone()),
2287            (None, Some(legacy)) => Some(legacy.clone()),
2288            (None, None) => None,
2289        };
2290        let bundled = entry.bundled.unwrap_or(false) || bundled_path.is_some();
2291        let (artifact, digest, expected_wasm_sha256, skip_digest_verification) = if bundled {
2292            let wasm_path = bundled_path.ok_or_else(|| {
2293                anyhow!(
2294                    "pack.lock component {} marked bundled but bundled_path is missing",
2295                    entry.name
2296                )
2297            })?;
2298            let expected_raw = match (&entry.wasm_sha256, &entry.legacy_sha256) {
2299                (Some(primary), Some(legacy)) => {
2300                    if primary != legacy {
2301                        bail!(
2302                            "pack.lock component {} has conflicting wasm_sha256 values: {} vs {}",
2303                            entry.name,
2304                            primary,
2305                            legacy
2306                        );
2307                    }
2308                    Some(primary.as_str())
2309                }
2310                (Some(primary), None) => Some(primary.as_str()),
2311                (None, Some(legacy)) => Some(legacy.as_str()),
2312                (None, None) => None,
2313            };
2314            let expected = match expected_raw {
2315                Some(value) => Some(normalize_sha256(value)?),
2316                None => None,
2317            };
2318            if expected.is_none() && !allow_missing_hash {
2319                bail!(
2320                    "pack.lock component {} missing wasm_sha256 for bundled component",
2321                    entry.name
2322                );
2323            }
2324            (
2325                ComponentArtifactLocation::Inline { wasm_path },
2326                expected.clone(),
2327                expected,
2328                allow_missing_hash && expected_raw.is_none(),
2329            )
2330        } else {
2331            if source.is_tag() {
2332                bail!(
2333                    "component {} uses tag ref {} but is not bundled; rebuild the pack",
2334                    entry.name,
2335                    source
2336                );
2337            }
2338            let expected = entry
2339                .resolved_digest
2340                .as_deref()
2341                .or(entry.digest.as_deref())
2342                .ok_or_else(|| {
2343                    anyhow!(
2344                        "pack.lock component {} missing resolved_digest for remote component",
2345                        entry.name
2346                    )
2347                })?;
2348            (
2349                ComponentArtifactLocation::Remote,
2350                Some(normalize_digest(expected)),
2351                None,
2352                false,
2353            )
2354        };
2355        let info = ComponentSourceInfo {
2356            digest,
2357            source,
2358            artifact,
2359            expected_wasm_sha256,
2360            skip_digest_verification,
2361        };
2362        if let Some(component_id) = entry.component_id.as_ref() {
2363            let key = component_id.as_str().to_string();
2364            if table.contains_key(&key) {
2365                bail!(
2366                    "pack.lock contains duplicate component id `{}`",
2367                    component_id.as_str()
2368                );
2369            }
2370            table.insert(key, info.clone());
2371        }
2372        if entry.name
2373            != entry
2374                .component_id
2375                .as_ref()
2376                .map(|id| id.as_str())
2377                .unwrap_or("")
2378        {
2379            table.insert(entry.name.clone(), info);
2380        }
2381    }
2382    Ok(table)
2383}
2384
2385fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
2386    if let Some(path) = &spec.legacy_path {
2387        return root.join(path);
2388    }
2389    root.join("components").join(format!("{}.wasm", spec.id))
2390}
2391
2392fn normalize_digest(digest: &str) -> String {
2393    if digest.starts_with("sha256:") || digest.starts_with("blake3:") {
2394        digest.to_string()
2395    } else {
2396        format!("sha256:{digest}")
2397    }
2398}
2399
2400fn compute_digest_for(bytes: &[u8], digest: &str) -> Result<String> {
2401    if digest.starts_with("blake3:") {
2402        let hash = blake3::hash(bytes);
2403        return Ok(format!("blake3:{}", hash.to_hex()));
2404    }
2405    let mut hasher = sha2::Sha256::new();
2406    hasher.update(bytes);
2407    Ok(format!("sha256:{:x}", hasher.finalize()))
2408}
2409
2410fn compute_sha256_digest_for(bytes: &[u8]) -> String {
2411    let mut hasher = sha2::Sha256::new();
2412    hasher.update(bytes);
2413    format!("sha256:{:x}", hasher.finalize())
2414}
2415
2416fn build_artifact_key(cache: &CacheManager, digest: Option<&str>, bytes: &[u8]) -> ArtifactKey {
2417    let wasm_digest = digest
2418        .map(normalize_digest)
2419        .unwrap_or_else(|| compute_sha256_digest_for(bytes));
2420    ArtifactKey::new(cache.engine_profile_id().to_string(), wasm_digest)
2421}
2422
2423async fn compile_component_with_cache(
2424    cache: &CacheManager,
2425    engine: &Engine,
2426    digest: Option<&str>,
2427    bytes: Vec<u8>,
2428) -> Result<Arc<Component>> {
2429    let key = build_artifact_key(cache, digest, &bytes);
2430    cache.get_component(engine, &key, || Ok(bytes)).await
2431}
2432
2433fn verify_component_digest(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2434    let normalized_expected = normalize_digest(expected);
2435    let actual = compute_digest_for(bytes, &normalized_expected)?;
2436    if normalize_digest(&actual) != normalized_expected {
2437        bail!(
2438            "component {component_id} digest mismatch: expected {normalized_expected}, got {actual}"
2439        );
2440    }
2441    Ok(())
2442}
2443
2444fn verify_wasm_sha256(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2445    let normalized_expected = normalize_sha256(expected)?;
2446    let actual = compute_sha256_digest_for(bytes);
2447    if actual != normalized_expected {
2448        bail!(
2449            "component {component_id} bundled digest mismatch: expected {normalized_expected}, got {actual}"
2450        );
2451    }
2452    Ok(())
2453}
2454
2455#[cfg(test)]
2456mod pack_lock_tests {
2457    use super::*;
2458    use tempfile::TempDir;
2459
2460    #[test]
2461    fn pack_lock_tag_ref_requires_bundle() {
2462        let lock = PackLockV1 {
2463            schema_version: 1,
2464            components: vec![PackLockComponent {
2465                name: "templates".to_string(),
2466                source_ref: Some("oci://registry.test/templates:latest".to_string()),
2467                legacy_ref: None,
2468                component_id: None,
2469                bundled: Some(false),
2470                bundled_path: None,
2471                legacy_path: None,
2472                wasm_sha256: None,
2473                legacy_sha256: None,
2474                resolved_digest: None,
2475                digest: None,
2476            }],
2477        };
2478        let err = component_sources_table_from_pack_lock(&lock, false).unwrap_err();
2479        assert!(
2480            err.to_string().contains("tag ref") && err.to_string().contains("rebuild the pack"),
2481            "unexpected error: {err}"
2482        );
2483    }
2484
2485    #[test]
2486    fn bundled_hash_mismatch_errors() {
2487        let rt = tokio::runtime::Runtime::new().expect("runtime");
2488        let temp = TempDir::new().expect("temp dir");
2489        let engine = Engine::default();
2490        let engine_profile =
2491            EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
2492        let cache_config = CacheConfig {
2493            root: temp.path().join("cache"),
2494            ..CacheConfig::default()
2495        };
2496        let cache = CacheManager::new(cache_config, engine_profile);
2497        let wasm_path = temp.path().join("component.wasm");
2498        let fixture_wasm = Path::new(env!("CARGO_MANIFEST_DIR"))
2499            .join("../../tests/fixtures/packs/secrets_store_smoke/components/echo_secret.wasm");
2500        let bytes = std::fs::read(&fixture_wasm).expect("read fixture wasm");
2501        std::fs::write(&wasm_path, &bytes).expect("write temp wasm");
2502
2503        let spec = ComponentSpec {
2504            id: "qa.process".to_string(),
2505            version: "0.0.0".to_string(),
2506            legacy_path: None,
2507        };
2508        let mut missing = HashSet::new();
2509        missing.insert(spec.id.clone());
2510
2511        let mut sources = HashMap::new();
2512        sources.insert(
2513            spec.id.clone(),
2514            ComponentSourceInfo {
2515                digest: Some("sha256:deadbeef".to_string()),
2516                source: ComponentSourceRef::Oci("registry.test/qa.process@sha256:deadbeef".into()),
2517                artifact: ComponentArtifactLocation::Inline {
2518                    wasm_path: "component.wasm".to_string(),
2519                },
2520                expected_wasm_sha256: Some("sha256:deadbeef".to_string()),
2521                skip_digest_verification: false,
2522            },
2523        );
2524
2525        let mut loaded = HashMap::new();
2526        let result = rt.block_on(load_components_from_sources(
2527            &cache,
2528            &engine,
2529            &sources,
2530            &ComponentResolution::default(),
2531            &[spec],
2532            &mut missing,
2533            &mut loaded,
2534            Some(temp.path()),
2535            None,
2536        ));
2537        let err = result.unwrap_err();
2538        assert!(
2539            err.to_string().contains("bundled digest mismatch"),
2540            "unexpected error: {err}"
2541        );
2542    }
2543}
2544
2545#[cfg(test)]
2546mod pack_resolution_prop_tests {
2547    use super::*;
2548    use greentic_types::{ArtifactLocationV1, ComponentSourceEntryV1, ResolvedComponentV1};
2549    use proptest::prelude::*;
2550    use proptest::test_runner::{Config as ProptestConfig, RngAlgorithm, TestRng, TestRunner};
2551    use std::collections::BTreeSet;
2552    use std::path::Path;
2553    use std::str::FromStr;
2554
2555    #[derive(Clone, Debug)]
2556    enum ResolveRequest {
2557        ById(String),
2558        ByName(String),
2559    }
2560
2561    #[derive(Clone, Debug, PartialEq, Eq)]
2562    struct ResolvedComponent {
2563        key: String,
2564        source: String,
2565        artifact: String,
2566        digest: Option<String>,
2567        expected_wasm_sha256: Option<String>,
2568        skip_digest_verification: bool,
2569    }
2570
2571    #[derive(Clone, Debug, PartialEq, Eq)]
2572    struct ResolveError {
2573        code: String,
2574        message: String,
2575        context_key: String,
2576    }
2577
2578    #[derive(Clone, Debug)]
2579    struct Scenario {
2580        pack_lock: Option<PackLockV1>,
2581        component_sources: Option<ComponentSourcesV1>,
2582        request: ResolveRequest,
2583        expected_sha256: Option<String>,
2584        bytes: Vec<u8>,
2585    }
2586
2587    fn resolve_component_test(
2588        sources: Option<&ComponentSourcesV1>,
2589        lock: Option<&PackLockV1>,
2590        request: &ResolveRequest,
2591    ) -> Result<ResolvedComponent, ResolveError> {
2592        let table = if let Some(lock) = lock {
2593            component_sources_table_from_pack_lock(lock, false).map_err(|err| ResolveError {
2594                code: classify_pack_lock_error(err.to_string().as_str()).to_string(),
2595                message: err.to_string(),
2596                context_key: request_key(request).to_string(),
2597            })?
2598        } else {
2599            let sources = component_sources_table(sources).map_err(|err| ResolveError {
2600                code: "component_sources_error".to_string(),
2601                message: err.to_string(),
2602                context_key: request_key(request).to_string(),
2603            })?;
2604            sources.ok_or_else(|| ResolveError {
2605                code: "missing_component_sources".to_string(),
2606                message: "component sources not provided".to_string(),
2607                context_key: request_key(request).to_string(),
2608            })?
2609        };
2610
2611        let key = request_key(request);
2612        let source = table.get(key).ok_or_else(|| ResolveError {
2613            code: "component_not_found".to_string(),
2614            message: format!("component {key} not found"),
2615            context_key: key.to_string(),
2616        })?;
2617
2618        Ok(ResolvedComponent {
2619            key: key.to_string(),
2620            source: source.source.to_string(),
2621            artifact: match source.artifact {
2622                ComponentArtifactLocation::Inline { .. } => "inline".to_string(),
2623                ComponentArtifactLocation::Remote => "remote".to_string(),
2624            },
2625            digest: source.digest.clone(),
2626            expected_wasm_sha256: source.expected_wasm_sha256.clone(),
2627            skip_digest_verification: source.skip_digest_verification,
2628        })
2629    }
2630
2631    fn request_key(request: &ResolveRequest) -> &str {
2632        match request {
2633            ResolveRequest::ById(value) => value.as_str(),
2634            ResolveRequest::ByName(value) => value.as_str(),
2635        }
2636    }
2637
2638    fn classify_pack_lock_error(message: &str) -> &'static str {
2639        if message.contains("duplicate component name") {
2640            "duplicate_name"
2641        } else if message.contains("duplicate component id") {
2642            "duplicate_id"
2643        } else if message.contains("conflicting refs") {
2644            "conflicting_ref"
2645        } else if message.contains("conflicting bundled paths") {
2646            "conflicting_bundled_path"
2647        } else if message.contains("conflicting wasm_sha256") {
2648            "conflicting_wasm_sha256"
2649        } else if message.contains("missing source_ref") {
2650            "missing_source_ref"
2651        } else if message.contains("marked bundled but bundled_path is missing") {
2652            "missing_bundled_path"
2653        } else if message.contains("missing wasm_sha256") {
2654            "missing_wasm_sha256"
2655        } else if message.contains("tag ref") && message.contains("not bundled") {
2656            "tag_ref_requires_bundle"
2657        } else if message.contains("missing resolved_digest") {
2658            "missing_resolved_digest"
2659        } else if message.contains("invalid component ref") {
2660            "invalid_component_ref"
2661        } else if message.contains("sha256 digest") {
2662            "invalid_sha256"
2663        } else {
2664            "unknown_error"
2665        }
2666    }
2667
2668    fn known_error_codes() -> BTreeSet<&'static str> {
2669        [
2670            "component_sources_error",
2671            "missing_component_sources",
2672            "component_not_found",
2673            "duplicate_name",
2674            "duplicate_id",
2675            "conflicting_ref",
2676            "conflicting_bundled_path",
2677            "conflicting_wasm_sha256",
2678            "missing_source_ref",
2679            "missing_bundled_path",
2680            "missing_wasm_sha256",
2681            "tag_ref_requires_bundle",
2682            "missing_resolved_digest",
2683            "invalid_component_ref",
2684            "invalid_sha256",
2685            "unknown_error",
2686        ]
2687        .into_iter()
2688        .collect()
2689    }
2690
2691    fn proptest_config() -> ProptestConfig {
2692        let cases = std::env::var("PROPTEST_CASES")
2693            .ok()
2694            .and_then(|value| value.parse::<u32>().ok())
2695            .unwrap_or(128);
2696        ProptestConfig {
2697            cases,
2698            failure_persistence: None,
2699            ..ProptestConfig::default()
2700        }
2701    }
2702
2703    fn proptest_seed() -> Option<[u8; 32]> {
2704        let seed = std::env::var("PROPTEST_SEED")
2705            .ok()
2706            .and_then(|value| value.parse::<u64>().ok())?;
2707        let mut bytes = [0u8; 32];
2708        bytes[..8].copy_from_slice(&seed.to_le_bytes());
2709        Some(bytes)
2710    }
2711
2712    fn run_cases(strategy: impl Strategy<Value = Scenario>, cases: u32, seed: Option<[u8; 32]>) {
2713        let config = ProptestConfig {
2714            cases,
2715            failure_persistence: None,
2716            ..ProptestConfig::default()
2717        };
2718        let mut runner = match seed {
2719            Some(bytes) => {
2720                TestRunner::new_with_rng(config, TestRng::from_seed(RngAlgorithm::ChaCha, &bytes))
2721            }
2722            None => TestRunner::new(config),
2723        };
2724        runner
2725            .run(&strategy, |scenario| {
2726                run_scenario(&scenario);
2727                Ok(())
2728            })
2729            .unwrap();
2730    }
2731
2732    fn run_scenario(scenario: &Scenario) {
2733        let known_codes = known_error_codes();
2734        let first = resolve_component_test(
2735            scenario.component_sources.as_ref(),
2736            scenario.pack_lock.as_ref(),
2737            &scenario.request,
2738        );
2739        let second = resolve_component_test(
2740            scenario.component_sources.as_ref(),
2741            scenario.pack_lock.as_ref(),
2742            &scenario.request,
2743        );
2744        assert_eq!(normalize_result(&first), normalize_result(&second));
2745
2746        if let Some(lock) = scenario.pack_lock.as_ref() {
2747            let lock_only = resolve_component_test(None, Some(lock), &scenario.request);
2748            assert_eq!(normalize_result(&first), normalize_result(&lock_only));
2749        }
2750
2751        if let Err(err) = first.as_ref() {
2752            assert!(
2753                known_codes.contains(err.code.as_str()),
2754                "unexpected error code {}: {}",
2755                err.code,
2756                err.message
2757            );
2758        }
2759
2760        if let Some(expected) = scenario.expected_sha256.as_deref() {
2761            let expected_ok =
2762                verify_wasm_sha256("test.component", expected, &scenario.bytes).is_ok();
2763            let actual = compute_sha256_digest_for(&scenario.bytes);
2764            if actual == normalize_sha256(expected).unwrap_or_default() {
2765                assert!(expected_ok, "expected sha256 match to succeed");
2766            } else {
2767                assert!(!expected_ok, "expected sha256 mismatch to fail");
2768            }
2769        }
2770    }
2771
2772    fn normalize_result(
2773        result: &Result<ResolvedComponent, ResolveError>,
2774    ) -> Result<ResolvedComponent, ResolveError> {
2775        match result {
2776            Ok(value) => Ok(value.clone()),
2777            Err(err) => Err(err.clone()),
2778        }
2779    }
2780
2781    fn scenario_strategy() -> impl Strategy<Value = Scenario> {
2782        let name = any::<u8>().prop_map(|n| format!("component{n}.core"));
2783        let alt_name = any::<u8>().prop_map(|n| format!("component_alt{n}.core"));
2784        let tag_ref = any::<bool>();
2785        let bundled = any::<bool>();
2786        let include_sha = any::<bool>();
2787        let include_component_id = any::<bool>();
2788        let request_by_id = any::<bool>();
2789        let use_lock = any::<bool>();
2790        let use_sources = any::<bool>();
2791        let bytes = prop::collection::vec(any::<u8>(), 1..64);
2792
2793        (
2794            name,
2795            alt_name,
2796            tag_ref,
2797            bundled,
2798            include_sha,
2799            include_component_id,
2800            request_by_id,
2801            use_lock,
2802            use_sources,
2803            bytes,
2804        )
2805            .prop_map(
2806                |(
2807                    name,
2808                    alt_name,
2809                    tag_ref,
2810                    bundled,
2811                    include_sha,
2812                    include_component_id,
2813                    request_by_id,
2814                    use_lock,
2815                    use_sources,
2816                    bytes,
2817                )| {
2818                    let component_id_str = if include_component_id {
2819                        alt_name.clone()
2820                    } else {
2821                        name.clone()
2822                    };
2823                    let component_id = ComponentId::from_str(&component_id_str).ok();
2824                    let source_ref = if tag_ref {
2825                        format!("oci://registry.test/{name}:v1")
2826                    } else {
2827                        format!(
2828                            "oci://registry.test/{name}@sha256:{}",
2829                            hex::encode([0x11u8; 32])
2830                        )
2831                    };
2832                    let expected_sha256 = if bundled && include_sha {
2833                        Some(compute_sha256_digest_for(&bytes))
2834                    } else {
2835                        None
2836                    };
2837
2838                    let lock_component = PackLockComponent {
2839                        name: name.clone(),
2840                        source_ref: Some(source_ref),
2841                        legacy_ref: None,
2842                        component_id,
2843                        bundled: Some(bundled),
2844                        bundled_path: if bundled {
2845                            Some(format!("components/{name}.wasm"))
2846                        } else {
2847                            None
2848                        },
2849                        legacy_path: None,
2850                        wasm_sha256: expected_sha256.clone(),
2851                        legacy_sha256: None,
2852                        resolved_digest: if bundled {
2853                            None
2854                        } else {
2855                            Some("sha256:deadbeef".to_string())
2856                        },
2857                        digest: None,
2858                    };
2859
2860                    let pack_lock = if use_lock {
2861                        Some(PackLockV1 {
2862                            schema_version: 1,
2863                            components: vec![lock_component],
2864                        })
2865                    } else {
2866                        None
2867                    };
2868
2869                    let component_sources = if use_sources {
2870                        Some(ComponentSourcesV1::new(vec![ComponentSourceEntryV1 {
2871                            name: name.clone(),
2872                            component_id: ComponentId::from_str(&name).ok(),
2873                            source: ComponentSourceRef::from_str(
2874                                "oci://registry.test/component@sha256:deadbeef",
2875                            )
2876                            .expect("component ref"),
2877                            resolved: ResolvedComponentV1 {
2878                                digest: "sha256:deadbeef".to_string(),
2879                                signature: None,
2880                                signed_by: None,
2881                            },
2882                            artifact: if bundled {
2883                                ArtifactLocationV1::Inline {
2884                                    wasm_path: format!("components/{name}.wasm"),
2885                                    manifest_path: None,
2886                                }
2887                            } else {
2888                                ArtifactLocationV1::Remote
2889                            },
2890                            licensing_hint: None,
2891                            metering_hint: None,
2892                        }]))
2893                    } else {
2894                        None
2895                    };
2896
2897                    let request = if request_by_id {
2898                        ResolveRequest::ById(component_id_str.clone())
2899                    } else {
2900                        ResolveRequest::ByName(name.clone())
2901                    };
2902
2903                    Scenario {
2904                        pack_lock,
2905                        component_sources,
2906                        request,
2907                        expected_sha256,
2908                        bytes,
2909                    }
2910                },
2911            )
2912    }
2913
2914    #[test]
2915    fn pack_resolution_proptest() {
2916        let seed = proptest_seed();
2917        run_cases(scenario_strategy(), proptest_config().cases, seed);
2918    }
2919
2920    #[test]
2921    fn pack_resolution_regression_seeds() {
2922        let seeds_path =
2923            Path::new(env!("CARGO_MANIFEST_DIR")).join("../../tests/fixtures/proptest-seeds.txt");
2924        let raw = std::fs::read_to_string(&seeds_path).expect("read proptest seeds");
2925        for line in raw.lines() {
2926            let line = line.trim();
2927            if line.is_empty() || line.starts_with('#') {
2928                continue;
2929            }
2930            let seed = line.parse::<u64>().expect("seed must be an integer");
2931            let mut bytes = [0u8; 32];
2932            bytes[..8].copy_from_slice(&seed.to_le_bytes());
2933            run_cases(scenario_strategy(), 1, Some(bytes));
2934        }
2935    }
2936}
2937
2938fn dist_options_from(component_resolution: &ComponentResolution) -> DistOptions {
2939    let mut opts = DistOptions {
2940        allow_tags: true,
2941        ..DistOptions::default()
2942    };
2943    if let Some(cache_dir) = component_resolution.dist_cache_dir.clone() {
2944        opts.cache_dir = cache_dir;
2945    }
2946    if component_resolution.dist_offline {
2947        opts.offline = true;
2948    }
2949    opts
2950}
2951
2952#[allow(clippy::too_many_arguments)]
2953async fn load_components_from_sources(
2954    cache: &CacheManager,
2955    engine: &Engine,
2956    component_sources: &HashMap<String, ComponentSourceInfo>,
2957    component_resolution: &ComponentResolution,
2958    specs: &[ComponentSpec],
2959    missing: &mut HashSet<String>,
2960    into: &mut HashMap<String, PackComponent>,
2961    materialized_root: Option<&Path>,
2962    archive_hint: Option<&Path>,
2963) -> Result<()> {
2964    let mut archive = if let Some(path) = archive_hint {
2965        Some(
2966            ZipArchive::new(File::open(path)?)
2967                .with_context(|| format!("{} is not a valid gtpack", path.display()))?,
2968        )
2969    } else {
2970        None
2971    };
2972    let mut dist_client: Option<DistClient> = None;
2973
2974    for spec in specs {
2975        if !missing.contains(&spec.id) {
2976            continue;
2977        }
2978        let Some(source) = component_sources.get(&spec.id) else {
2979            continue;
2980        };
2981
2982        let bytes = match &source.artifact {
2983            ComponentArtifactLocation::Inline { wasm_path } => {
2984                if let Some(root) = materialized_root {
2985                    let path = root.join(wasm_path);
2986                    if path.exists() {
2987                        std::fs::read(&path).with_context(|| {
2988                            format!(
2989                                "failed to read inline component {} from {}",
2990                                spec.id,
2991                                path.display()
2992                            )
2993                        })?
2994                    } else if archive.is_none() {
2995                        bail!("inline component {} missing at {}", spec.id, path.display());
2996                    } else {
2997                        read_entry(
2998                            archive.as_mut().expect("archive present when needed"),
2999                            wasm_path,
3000                        )
3001                        .with_context(|| {
3002                            format!(
3003                                "inline component {} missing at {} in pack archive",
3004                                spec.id, wasm_path
3005                            )
3006                        })?
3007                    }
3008                } else if let Some(archive) = archive.as_mut() {
3009                    read_entry(archive, wasm_path).with_context(|| {
3010                        format!(
3011                            "inline component {} missing at {} in pack archive",
3012                            spec.id, wasm_path
3013                        )
3014                    })?
3015                } else {
3016                    bail!(
3017                        "inline component {} missing and no pack source available",
3018                        spec.id
3019                    );
3020                }
3021            }
3022            ComponentArtifactLocation::Remote => {
3023                if source.source.is_tag() {
3024                    bail!(
3025                        "component {} uses tag ref {} but is not bundled; rebuild the pack",
3026                        spec.id,
3027                        source.source
3028                    );
3029                }
3030                let client = dist_client.get_or_insert_with(|| {
3031                    DistClient::new(dist_options_from(component_resolution))
3032                });
3033                let reference = source.source.to_string();
3034                fault::maybe_fail_asset(&reference)
3035                    .await
3036                    .with_context(|| format!("fault injection blocked asset {reference}"))?;
3037                let digest = source.digest.as_deref().ok_or_else(|| {
3038                    anyhow!(
3039                        "component {} missing expected digest for remote component",
3040                        spec.id
3041                    )
3042                })?;
3043                let cache_path = if component_resolution.dist_offline {
3044                    client
3045                        .fetch_digest(digest)
3046                        .await
3047                        .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?
3048                } else {
3049                    let resolved = client
3050                        .resolve_ref(&reference)
3051                        .await
3052                        .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
3053                    let expected = normalize_digest(digest);
3054                    let actual = normalize_digest(&resolved.digest);
3055                    if expected != actual {
3056                        bail!(
3057                            "component {} digest mismatch after fetch: expected {}, got {}",
3058                            spec.id,
3059                            expected,
3060                            actual
3061                        );
3062                    }
3063                    resolved.cache_path.ok_or_else(|| {
3064                        anyhow!(
3065                            "component {} resolved from {} but cache path is missing",
3066                            spec.id,
3067                            reference
3068                        )
3069                    })?
3070                };
3071                std::fs::read(&cache_path).with_context(|| {
3072                    format!(
3073                        "failed to read cached component {} from {}",
3074                        spec.id,
3075                        cache_path.display()
3076                    )
3077                })?
3078            }
3079        };
3080
3081        if let Some(expected) = source.expected_wasm_sha256.as_deref() {
3082            verify_wasm_sha256(&spec.id, expected, &bytes)?;
3083        } else if source.skip_digest_verification {
3084            let actual = compute_sha256_digest_for(&bytes);
3085            warn!(
3086                component_id = %spec.id,
3087                digest = %actual,
3088                "bundled component missing wasm_sha256; allowing due to flag"
3089            );
3090        } else {
3091            let expected = source.digest.as_deref().ok_or_else(|| {
3092                anyhow!(
3093                    "component {} missing expected digest for verification",
3094                    spec.id
3095                )
3096            })?;
3097            verify_component_digest(&spec.id, expected, &bytes)?;
3098        }
3099        let component =
3100            compile_component_with_cache(cache, engine, source.digest.as_deref(), bytes)
3101                .await
3102                .with_context(|| format!("failed to compile component {}", spec.id))?;
3103        into.insert(
3104            spec.id.clone(),
3105            PackComponent {
3106                name: spec.id.clone(),
3107                version: spec.version.clone(),
3108                component,
3109            },
3110        );
3111        missing.remove(&spec.id);
3112    }
3113
3114    Ok(())
3115}
3116
3117fn dist_error_for_component(err: DistError, component_id: &str, reference: &str) -> anyhow::Error {
3118    match err {
3119        DistError::CacheMiss { reference: missing } => anyhow!(
3120            "remote component {} is not cached for {}. Run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3121            component_id,
3122            missing,
3123            reference
3124        ),
3125        DistError::Offline { reference: blocked } => anyhow!(
3126            "offline mode blocked fetching component {} from {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3127            component_id,
3128            blocked,
3129            reference
3130        ),
3131        DistError::AuthRequired { target } => anyhow!(
3132            "component {} requires authenticated source {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3133            component_id,
3134            target,
3135            reference
3136        ),
3137        other => anyhow!(
3138            "failed to resolve component {} from {}: {}",
3139            component_id,
3140            reference,
3141            other
3142        ),
3143    }
3144}
3145
3146async fn load_components_from_overrides(
3147    cache: &CacheManager,
3148    engine: &Engine,
3149    overrides: &HashMap<String, PathBuf>,
3150    specs: &[ComponentSpec],
3151    missing: &mut HashSet<String>,
3152    into: &mut HashMap<String, PackComponent>,
3153) -> Result<()> {
3154    for spec in specs {
3155        if !missing.contains(&spec.id) {
3156            continue;
3157        }
3158        let Some(path) = overrides.get(&spec.id) else {
3159            continue;
3160        };
3161        let bytes = std::fs::read(path)
3162            .with_context(|| format!("failed to read override component {}", path.display()))?;
3163        let component = compile_component_with_cache(cache, engine, None, bytes)
3164            .await
3165            .with_context(|| {
3166                format!(
3167                    "failed to compile component {} from override {}",
3168                    spec.id,
3169                    path.display()
3170                )
3171            })?;
3172        into.insert(
3173            spec.id.clone(),
3174            PackComponent {
3175                name: spec.id.clone(),
3176                version: spec.version.clone(),
3177                component,
3178            },
3179        );
3180        missing.remove(&spec.id);
3181    }
3182    Ok(())
3183}
3184
3185async fn load_components_from_dir(
3186    cache: &CacheManager,
3187    engine: &Engine,
3188    root: &Path,
3189    specs: &[ComponentSpec],
3190    missing: &mut HashSet<String>,
3191    into: &mut HashMap<String, PackComponent>,
3192) -> Result<()> {
3193    for spec in specs {
3194        if !missing.contains(&spec.id) {
3195            continue;
3196        }
3197        let path = component_path_for_spec(root, spec);
3198        if !path.exists() {
3199            tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
3200            continue;
3201        }
3202        let bytes = std::fs::read(&path)
3203            .with_context(|| format!("failed to read component {}", path.display()))?;
3204        let component = compile_component_with_cache(cache, engine, None, bytes)
3205            .await
3206            .with_context(|| {
3207                format!(
3208                    "failed to compile component {} from {}",
3209                    spec.id,
3210                    path.display()
3211                )
3212            })?;
3213        into.insert(
3214            spec.id.clone(),
3215            PackComponent {
3216                name: spec.id.clone(),
3217                version: spec.version.clone(),
3218                component,
3219            },
3220        );
3221        missing.remove(&spec.id);
3222    }
3223    Ok(())
3224}
3225
3226async fn load_components_from_archive(
3227    cache: &CacheManager,
3228    engine: &Engine,
3229    path: &Path,
3230    specs: &[ComponentSpec],
3231    missing: &mut HashSet<String>,
3232    into: &mut HashMap<String, PackComponent>,
3233) -> Result<()> {
3234    let mut archive = ZipArchive::new(File::open(path)?)
3235        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
3236    for spec in specs {
3237        if !missing.contains(&spec.id) {
3238            continue;
3239        }
3240        let file_name = spec
3241            .legacy_path
3242            .clone()
3243            .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
3244        let bytes = match read_entry(&mut archive, &file_name) {
3245            Ok(bytes) => bytes,
3246            Err(err) => {
3247                warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
3248                continue;
3249            }
3250        };
3251        let component = compile_component_with_cache(cache, engine, None, bytes)
3252            .await
3253            .with_context(|| format!("failed to compile component {}", spec.id))?;
3254        into.insert(
3255            spec.id.clone(),
3256            PackComponent {
3257                name: spec.id.clone(),
3258                version: spec.version.clone(),
3259                component,
3260            },
3261        );
3262        missing.remove(&spec.id);
3263    }
3264    Ok(())
3265}
3266
3267#[cfg(test)]
3268mod tests {
3269    use super::*;
3270    use greentic_flow::model::{FlowDoc, NodeDoc};
3271    use indexmap::IndexMap;
3272    use serde_json::json;
3273
3274    #[test]
3275    fn normalizes_raw_component_to_component_exec() {
3276        let mut nodes = IndexMap::new();
3277        let mut raw = IndexMap::new();
3278        raw.insert(
3279            "templating.handlebars".into(),
3280            json!({ "template": "Hi {{name}}" }),
3281        );
3282        nodes.insert(
3283            "start".into(),
3284            NodeDoc {
3285                raw,
3286                routing: json!([{"out": true}]),
3287                ..Default::default()
3288            },
3289        );
3290        let doc = FlowDoc {
3291            id: "welcome".into(),
3292            title: None,
3293            description: None,
3294            flow_type: "messaging".into(),
3295            start: Some("start".into()),
3296            parameters: json!({}),
3297            tags: Vec::new(),
3298            schema_version: None,
3299            entrypoints: IndexMap::new(),
3300            nodes,
3301        };
3302
3303        let normalized = normalize_flow_doc(doc);
3304        let node = normalized.nodes.get("start").expect("node exists");
3305        assert_eq!(node.operation.as_deref(), Some("component.exec"));
3306        assert!(node.raw.is_empty());
3307        let payload = node.payload.as_object().expect("payload object");
3308        assert_eq!(
3309            payload.get("component"),
3310            Some(&Value::String("templating.handlebars".into()))
3311        );
3312        assert_eq!(
3313            payload.get("operation"),
3314            Some(&Value::String("render".into()))
3315        );
3316        let input = payload.get("input").unwrap();
3317        assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
3318    }
3319}
3320
3321#[derive(Clone, Debug, Default, Serialize, Deserialize)]
3322pub struct PackMetadata {
3323    pub pack_id: String,
3324    pub version: String,
3325    #[serde(default)]
3326    pub entry_flows: Vec<String>,
3327    #[serde(default)]
3328    pub secret_requirements: Vec<greentic_types::SecretRequirement>,
3329}
3330
3331impl PackMetadata {
3332    fn from_wasm(bytes: &[u8]) -> Option<Self> {
3333        let parser = Parser::new(0);
3334        for payload in parser.parse_all(bytes) {
3335            let payload = payload.ok()?;
3336            match payload {
3337                Payload::CustomSection(section) => {
3338                    if section.name() == "greentic.manifest"
3339                        && let Ok(meta) = Self::from_bytes(section.data())
3340                    {
3341                        return Some(meta);
3342                    }
3343                }
3344                Payload::DataSection(reader) => {
3345                    for segment in reader.into_iter().flatten() {
3346                        if let Ok(meta) = Self::from_bytes(segment.data) {
3347                            return Some(meta);
3348                        }
3349                    }
3350                }
3351                _ => {}
3352            }
3353        }
3354        None
3355    }
3356
3357    fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
3358        #[derive(Deserialize)]
3359        struct RawManifest {
3360            pack_id: String,
3361            version: String,
3362            #[serde(default)]
3363            entry_flows: Vec<String>,
3364            #[serde(default)]
3365            flows: Vec<RawFlow>,
3366            #[serde(default)]
3367            secret_requirements: Vec<greentic_types::SecretRequirement>,
3368        }
3369
3370        #[derive(Deserialize)]
3371        struct RawFlow {
3372            id: String,
3373        }
3374
3375        let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
3376        let mut entry_flows = if manifest.entry_flows.is_empty() {
3377            manifest.flows.iter().map(|f| f.id.clone()).collect()
3378        } else {
3379            manifest.entry_flows.clone()
3380        };
3381        entry_flows.retain(|id| !id.is_empty());
3382        Ok(Self {
3383            pack_id: manifest.pack_id,
3384            version: manifest.version,
3385            entry_flows,
3386            secret_requirements: manifest.secret_requirements,
3387        })
3388    }
3389
3390    pub fn fallback(path: &Path) -> Self {
3391        let pack_id = path
3392            .file_stem()
3393            .map(|s| s.to_string_lossy().into_owned())
3394            .unwrap_or_else(|| "unknown-pack".to_string());
3395        Self {
3396            pack_id,
3397            version: "0.0.0".to_string(),
3398            entry_flows: Vec::new(),
3399            secret_requirements: Vec::new(),
3400        }
3401    }
3402
3403    pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
3404        let entry_flows = manifest
3405            .flows
3406            .iter()
3407            .map(|flow| flow.id.as_str().to_string())
3408            .collect::<Vec<_>>();
3409        Self {
3410            pack_id: manifest.pack_id.as_str().to_string(),
3411            version: manifest.version.to_string(),
3412            entry_flows,
3413            secret_requirements: manifest.secret_requirements.clone(),
3414        }
3415    }
3416}