greentic_runner_host/
pack.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::cache::{ArtifactKey, CacheConfig, CacheManager, CpuPolicy, EngineProfile};
10use crate::component_api::{
11    self, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::provider::{ProviderBinding, ProviderRegistry};
15use crate::provider_core::SchemaCorePre as ProviderComponentPre;
16use crate::provider_core_only;
17use crate::runtime_wasmtime::{Component, Engine, InstancePre, Linker, ResourceTable};
18use anyhow::{Context, Result, anyhow, bail};
19use greentic_distributor_client::dist::{DistClient, DistError, DistOptions};
20use greentic_interfaces_wasmtime::host_helpers::v1::{
21    self as host_v1, HostFns, add_all_v1_to_linker,
22    runner_host_http::RunnerHostHttp,
23    runner_host_kv::RunnerHostKv,
24    secrets_store::{SecretsError, SecretsErrorV1_1, SecretsStoreHost, SecretsStoreHostV1_1},
25    state_store::{
26        OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
27        StateStoreHost, TenantCtx as StateTenantCtx,
28    },
29    telemetry_logger::{
30        OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
31        TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
32        TenantCtx as TelemetryTenantCtx,
33    },
34};
35use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
36use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
37use greentic_pack::builder as legacy_pack;
38use greentic_types::flow::FlowHasher;
39use greentic_types::{
40    ArtifactLocationV1, ComponentId, ComponentManifest, ComponentSourceRef, ComponentSourcesV1,
41    EXT_COMPONENT_SOURCES_V1, EnvId, ExtensionRef, Flow, FlowComponentRef, FlowId, FlowKind,
42    FlowMetadata, InputMapping, Node, NodeId, OutputMapping, Routing, StateKey as StoreStateKey,
43    TeamId, TelemetryHints, TenantCtx as TypesTenantCtx, TenantId, UserId, decode_pack_manifest,
44    pack_manifest::ExtensionInline,
45};
46use host_v1::http_client::{
47    HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
48    Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
49    RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
50    TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
51};
52use indexmap::IndexMap;
53use once_cell::sync::Lazy;
54use parking_lot::{Mutex, RwLock};
55use reqwest::blocking::Client as BlockingClient;
56use runner_core::normalize_under_root;
57use serde::{Deserialize, Serialize};
58use serde_cbor;
59use serde_json::{self, Value};
60use sha2::Digest;
61use tokio::fs;
62use wasmparser::{Parser, Payload};
63use wasmtime::StoreContextMut;
64use zip::ZipArchive;
65
66use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
67use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
68use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
69#[cfg(feature = "fault-injection")]
70use crate::testing::fault_injection::{FaultContext, FaultPoint, maybe_fail};
71
72use crate::config::HostConfig;
73use crate::fault;
74use crate::secrets::{DynSecretsManager, read_secret_blocking, write_secret_blocking};
75use crate::storage::state::STATE_PREFIX;
76use crate::storage::{DynSessionStore, DynStateStore};
77use crate::verify;
78use crate::wasi::RunnerWasiPolicy;
79use tracing::warn;
80use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
81use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
82
83use greentic_flow::model::FlowDoc;
84
85#[allow(dead_code)]
86pub struct PackRuntime {
87    /// Component artifact path (wasm file).
88    path: PathBuf,
89    /// Optional archive (.gtpack) used to load flows/manifests.
90    archive_path: Option<PathBuf>,
91    config: Arc<HostConfig>,
92    engine: Engine,
93    metadata: PackMetadata,
94    manifest: Option<greentic_types::PackManifest>,
95    legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
96    component_manifests: HashMap<String, ComponentManifest>,
97    mocks: Option<Arc<MockLayer>>,
98    flows: Option<PackFlows>,
99    components: HashMap<String, PackComponent>,
100    http_client: Arc<BlockingClient>,
101    pre_cache: Mutex<HashMap<String, InstancePre<ComponentState>>>,
102    session_store: Option<DynSessionStore>,
103    state_store: Option<DynStateStore>,
104    wasi_policy: Arc<RunnerWasiPolicy>,
105    provider_registry: RwLock<Option<ProviderRegistry>>,
106    secrets: DynSecretsManager,
107    oauth_config: Option<OAuthBrokerConfig>,
108    cache: CacheManager,
109}
110
111struct PackComponent {
112    #[allow(dead_code)]
113    name: String,
114    #[allow(dead_code)]
115    version: String,
116    component: Arc<Component>,
117}
118
119#[derive(Debug, Default, Clone)]
120pub struct ComponentResolution {
121    /// Root of a materialized pack directory containing `manifest.cbor` and `components/`.
122    pub materialized_root: Option<PathBuf>,
123    /// Explicit overrides mapping component id -> wasm path.
124    pub overrides: HashMap<String, PathBuf>,
125    /// If true, do not fetch remote components; require cached artifacts.
126    pub dist_offline: bool,
127    /// Optional cache directory for resolved remote components.
128    pub dist_cache_dir: Option<PathBuf>,
129    /// Allow bundled components without wasm_sha256 (dev-only escape hatch).
130    pub allow_missing_hash: bool,
131}
132
133fn build_blocking_client() -> BlockingClient {
134    std::thread::spawn(|| {
135        BlockingClient::builder()
136            .no_proxy()
137            .build()
138            .expect("blocking client")
139    })
140    .join()
141    .expect("client build thread panicked")
142}
143
144fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
145    let (root, candidate) = if path.is_absolute() {
146        let parent = path
147            .parent()
148            .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
149        let root = parent
150            .canonicalize()
151            .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
152        let file = path
153            .file_name()
154            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
155        (root, PathBuf::from(file))
156    } else {
157        let cwd = std::env::current_dir().context("failed to resolve current directory")?;
158        let base = if let Some(parent) = path.parent() {
159            cwd.join(parent)
160        } else {
161            cwd
162        };
163        let root = base
164            .canonicalize()
165            .with_context(|| format!("failed to canonicalize {}", base.display()))?;
166        let file = path
167            .file_name()
168            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
169        (root, PathBuf::from(file))
170    };
171    let safe = normalize_under_root(&root, &candidate)?;
172    Ok((root, safe))
173}
174
175static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
176
177#[derive(Debug, Clone, Serialize, Deserialize)]
178pub struct FlowDescriptor {
179    pub id: String,
180    #[serde(rename = "type")]
181    pub flow_type: String,
182    pub pack_id: String,
183    pub profile: String,
184    pub version: String,
185    #[serde(default)]
186    pub description: Option<String>,
187}
188
189pub struct HostState {
190    #[allow(dead_code)]
191    pack_id: String,
192    config: Arc<HostConfig>,
193    http_client: Arc<BlockingClient>,
194    default_env: String,
195    #[allow(dead_code)]
196    session_store: Option<DynSessionStore>,
197    state_store: Option<DynStateStore>,
198    mocks: Option<Arc<MockLayer>>,
199    secrets: DynSecretsManager,
200    oauth_config: Option<OAuthBrokerConfig>,
201    oauth_host: OAuthBrokerHost,
202    exec_ctx: Option<ComponentExecCtx>,
203    component_ref: Option<String>,
204    provider_core_component: bool,
205}
206
207impl HostState {
208    #[allow(clippy::default_constructed_unit_structs)]
209    #[allow(clippy::too_many_arguments)]
210    pub fn new(
211        pack_id: String,
212        config: Arc<HostConfig>,
213        http_client: Arc<BlockingClient>,
214        mocks: Option<Arc<MockLayer>>,
215        session_store: Option<DynSessionStore>,
216        state_store: Option<DynStateStore>,
217        secrets: DynSecretsManager,
218        oauth_config: Option<OAuthBrokerConfig>,
219        exec_ctx: Option<ComponentExecCtx>,
220        component_ref: Option<String>,
221        provider_core_component: bool,
222    ) -> Result<Self> {
223        let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
224        Ok(Self {
225            pack_id,
226            config,
227            http_client,
228            default_env,
229            session_store,
230            state_store,
231            mocks,
232            secrets,
233            oauth_config,
234            oauth_host: OAuthBrokerHost::default(),
235            exec_ctx,
236            component_ref,
237            provider_core_component,
238        })
239    }
240
241    pub fn get_secret(&self, key: &str) -> Result<String> {
242        if provider_core_only::is_enabled() {
243            bail!(provider_core_only::blocked_message("secrets"))
244        }
245        if !self.config.secrets_policy.is_allowed(key) {
246            bail!("secret {key} is not permitted by bindings policy");
247        }
248        if let Some(mock) = &self.mocks
249            && let Some(value) = mock.secrets_lookup(key)
250        {
251            return Ok(value);
252        }
253        let ctx = self.config.tenant_ctx();
254        let bytes = read_secret_blocking(&self.secrets, &ctx, key)
255            .context("failed to read secret from manager")?;
256        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
257        Ok(value)
258    }
259
260    fn allows_secret_write_in_provider_core_only(&self) -> bool {
261        self.provider_core_component || self.component_ref.is_none()
262    }
263
264    fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
265        let tenant_raw = ctx
266            .as_ref()
267            .map(|ctx| ctx.tenant.clone())
268            .or_else(|| self.exec_ctx.as_ref().map(|ctx| ctx.tenant.tenant.clone()))
269            .unwrap_or_else(|| self.config.tenant.clone());
270        let env_raw = ctx
271            .as_ref()
272            .map(|ctx| ctx.env.clone())
273            .unwrap_or_else(|| self.default_env.clone());
274        let tenant_id = TenantId::from_str(&tenant_raw)
275            .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
276        let env_id = EnvId::from_str(&env_raw)
277            .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
278        let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
279        if let Some(exec_ctx) = self.exec_ctx.as_ref() {
280            if let Some(team) = exec_ctx.tenant.team.as_ref() {
281                let team_id =
282                    TeamId::from_str(team).with_context(|| format!("invalid team id `{team}`"))?;
283                tenant_ctx = tenant_ctx.with_team(Some(team_id));
284            }
285            if let Some(user) = exec_ctx.tenant.user.as_ref() {
286                let user_id =
287                    UserId::from_str(user).with_context(|| format!("invalid user id `{user}`"))?;
288                tenant_ctx = tenant_ctx.with_user(Some(user_id));
289            }
290            tenant_ctx = tenant_ctx.with_flow(exec_ctx.flow_id.clone());
291            if let Some(node) = exec_ctx.node_id.as_ref() {
292                tenant_ctx = tenant_ctx.with_node(node.clone());
293            }
294            if let Some(session) = exec_ctx.tenant.correlation_id.as_ref() {
295                tenant_ctx = tenant_ctx.with_session(session.clone());
296            }
297            tenant_ctx.trace_id = exec_ctx.tenant.trace_id.clone();
298        }
299
300        if let Some(ctx) = ctx {
301            if let Some(team) = ctx.team.or(ctx.team_id) {
302                let team_id =
303                    TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
304                tenant_ctx = tenant_ctx.with_team(Some(team_id));
305            }
306            if let Some(user) = ctx.user.or(ctx.user_id) {
307                let user_id =
308                    UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
309                tenant_ctx = tenant_ctx.with_user(Some(user_id));
310            }
311            if let Some(flow) = ctx.flow_id {
312                tenant_ctx = tenant_ctx.with_flow(flow);
313            }
314            if let Some(node) = ctx.node_id {
315                tenant_ctx = tenant_ctx.with_node(node);
316            }
317            if let Some(provider) = ctx.provider_id {
318                tenant_ctx = tenant_ctx.with_provider(provider);
319            }
320            if let Some(session) = ctx.session_id {
321                tenant_ctx = tenant_ctx.with_session(session);
322            }
323            tenant_ctx.trace_id = ctx.trace_id;
324        }
325        Ok(tenant_ctx)
326    }
327
328    fn send_http_request(
329        &mut self,
330        req: HttpRequest,
331        opts: Option<HttpRequestOptionsV1_1>,
332        _ctx: Option<HttpTenantCtx>,
333    ) -> Result<HttpResponse, HttpClientError> {
334        if !self.config.http_enabled {
335            return Err(HttpClientError {
336                code: "denied".into(),
337                message: "http client disabled by policy".into(),
338            });
339        }
340
341        let mut mock_state = None;
342        let raw_body = req.body.clone();
343        if let Some(mock) = &self.mocks
344            && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
345        {
346            match mock.http_begin(&meta) {
347                HttpDecision::Mock(response) => {
348                    let headers = response
349                        .headers
350                        .iter()
351                        .map(|(k, v)| (k.clone(), v.clone()))
352                        .collect();
353                    return Ok(HttpResponse {
354                        status: response.status,
355                        headers,
356                        body: response.body.clone().map(|b| b.into_bytes()),
357                    });
358                }
359                HttpDecision::Deny(reason) => {
360                    return Err(HttpClientError {
361                        code: "denied".into(),
362                        message: reason,
363                    });
364                }
365                HttpDecision::Passthrough { record } => {
366                    mock_state = Some((meta, record));
367                }
368            }
369        }
370
371        let method = req.method.parse().unwrap_or(reqwest::Method::GET);
372        let mut builder = self.http_client.request(method, &req.url);
373        for (key, value) in req.headers {
374            if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
375                && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
376            {
377                builder = builder.header(header, header_value);
378            }
379        }
380
381        if let Some(body) = raw_body.clone() {
382            builder = builder.body(body);
383        }
384
385        if let Some(opts) = opts {
386            if let Some(timeout_ms) = opts.timeout_ms {
387                builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
388            }
389            if opts.allow_insecure == Some(true) {
390                warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
391            }
392            if let Some(follow_redirects) = opts.follow_redirects
393                && !follow_redirects
394            {
395                warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
396            }
397        }
398
399        let response = match builder.send() {
400            Ok(resp) => resp,
401            Err(err) => {
402                warn!(url = %req.url, error = %err, "http client request failed");
403                return Err(HttpClientError {
404                    code: "unavailable".into(),
405                    message: err.to_string(),
406                });
407            }
408        };
409
410        let status = response.status().as_u16();
411        let headers_vec = response
412            .headers()
413            .iter()
414            .map(|(k, v)| {
415                (
416                    k.as_str().to_string(),
417                    v.to_str().unwrap_or_default().to_string(),
418                )
419            })
420            .collect::<Vec<_>>();
421        let body_bytes = response.bytes().ok().map(|b| b.to_vec());
422
423        if let Some((meta, true)) = mock_state.take()
424            && let Some(mock) = &self.mocks
425        {
426            let recorded = HttpMockResponse::new(
427                status,
428                headers_vec.clone().into_iter().collect(),
429                body_bytes
430                    .as_ref()
431                    .map(|b| String::from_utf8_lossy(b).into_owned()),
432            );
433            mock.http_record(&meta, &recorded);
434        }
435
436        Ok(HttpResponse {
437            status,
438            headers: headers_vec,
439            body: body_bytes,
440        })
441    }
442}
443
444impl SecretsStoreHost for HostState {
445    fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
446        if provider_core_only::is_enabled() {
447            warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
448            return Err(SecretsError::Denied);
449        }
450        if !self.config.secrets_policy.is_allowed(&key) {
451            return Err(SecretsError::Denied);
452        }
453        if let Some(mock) = &self.mocks
454            && let Some(value) = mock.secrets_lookup(&key)
455        {
456            return Ok(Some(value.into_bytes()));
457        }
458        let ctx = self.config.tenant_ctx();
459        match read_secret_blocking(&self.secrets, &ctx, &key) {
460            Ok(bytes) => Ok(Some(bytes)),
461            Err(err) => {
462                warn!(secret = %key, error = %err, "secret lookup failed");
463                Err(SecretsError::NotFound)
464            }
465        }
466    }
467}
468
469impl SecretsStoreHostV1_1 for HostState {
470    fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsErrorV1_1> {
471        if provider_core_only::is_enabled() {
472            warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
473            return Err(SecretsErrorV1_1::Denied);
474        }
475        if !self.config.secrets_policy.is_allowed(&key) {
476            return Err(SecretsErrorV1_1::Denied);
477        }
478        if let Some(mock) = &self.mocks
479            && let Some(value) = mock.secrets_lookup(&key)
480        {
481            return Ok(Some(value.into_bytes()));
482        }
483        let ctx = self.config.tenant_ctx();
484        match read_secret_blocking(&self.secrets, &ctx, &key) {
485            Ok(bytes) => Ok(Some(bytes)),
486            Err(err) => {
487                warn!(secret = %key, error = %err, "secret lookup failed");
488                Err(SecretsErrorV1_1::NotFound)
489            }
490        }
491    }
492
493    fn put(&mut self, key: String, value: Vec<u8>) {
494        if key.trim().is_empty() {
495            warn!(secret = %key, "secret write blocked: empty key");
496            panic!("secret write denied for key {key}: invalid key");
497        }
498        if provider_core_only::is_enabled() && !self.allows_secret_write_in_provider_core_only() {
499            warn!(
500                secret = %key,
501                component = self.component_ref.as_deref().unwrap_or("<pack>"),
502                "provider-core only mode enabled; blocking secrets store write"
503            );
504            panic!("secret write denied for key {key}: provider-core-only mode");
505        }
506        if !self.config.secrets_policy.is_allowed(&key) {
507            warn!(secret = %key, "secret write denied by bindings policy");
508            panic!("secret write denied for key {key}: policy");
509        }
510        let ctx = self.config.tenant_ctx();
511        if let Err(err) = write_secret_blocking(&self.secrets, &ctx, &key, &value) {
512            warn!(secret = %key, error = %err, "secret write failed");
513            panic!("secret write failed for key {key}");
514        }
515    }
516}
517
518impl HttpClientHost for HostState {
519    fn send(
520        &mut self,
521        req: HttpRequest,
522        ctx: Option<HttpTenantCtx>,
523    ) -> Result<HttpResponse, HttpClientError> {
524        self.send_http_request(req, None, ctx)
525    }
526}
527
528impl HttpClientHostV1_1 for HostState {
529    fn send(
530        &mut self,
531        req: HttpRequestV1_1,
532        opts: Option<HttpRequestOptionsV1_1>,
533        ctx: Option<HttpTenantCtxV1_1>,
534    ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
535        let legacy_req = HttpRequest {
536            method: req.method,
537            url: req.url,
538            headers: req.headers,
539            body: req.body,
540        };
541        let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
542            env: ctx.env,
543            tenant: ctx.tenant,
544            tenant_id: ctx.tenant_id,
545            team: ctx.team,
546            team_id: ctx.team_id,
547            user: ctx.user,
548            user_id: ctx.user_id,
549            trace_id: ctx.trace_id,
550            correlation_id: ctx.correlation_id,
551            attributes: ctx.attributes,
552            session_id: ctx.session_id,
553            flow_id: ctx.flow_id,
554            node_id: ctx.node_id,
555            provider_id: ctx.provider_id,
556            deadline_ms: ctx.deadline_ms,
557            attempt: ctx.attempt,
558            idempotency_key: ctx.idempotency_key,
559            impersonation: ctx
560                .impersonation
561                .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
562                    actor_id,
563                    reason,
564                }),
565        });
566
567        self.send_http_request(legacy_req, opts, legacy_ctx)
568            .map(|resp| HttpResponseV1_1 {
569                status: resp.status,
570                headers: resp.headers,
571                body: resp.body,
572            })
573            .map_err(|err| HttpClientErrorV1_1 {
574                code: err.code,
575                message: err.message,
576            })
577    }
578}
579
580impl StateStoreHost for HostState {
581    fn read(
582        &mut self,
583        key: HostStateKey,
584        ctx: Option<StateTenantCtx>,
585    ) -> Result<Vec<u8>, StateError> {
586        let store = match self.state_store.as_ref() {
587            Some(store) => store.clone(),
588            None => {
589                return Err(StateError {
590                    code: "unavailable".into(),
591                    message: "state store not configured".into(),
592                });
593            }
594        };
595        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
596            Ok(ctx) => ctx,
597            Err(err) => {
598                return Err(StateError {
599                    code: "invalid-ctx".into(),
600                    message: err.to_string(),
601                });
602            }
603        };
604        #[cfg(feature = "fault-injection")]
605        {
606            let exec_ctx = self.exec_ctx.as_ref();
607            let flow_id = exec_ctx
608                .map(|ctx| ctx.flow_id.as_str())
609                .unwrap_or("unknown");
610            let node_id = exec_ctx.and_then(|ctx| ctx.node_id.as_deref());
611            let attempt = exec_ctx.map(|ctx| ctx.tenant.attempt).unwrap_or(1);
612            let fault_ctx = FaultContext {
613                pack_id: self.pack_id.as_str(),
614                flow_id,
615                node_id,
616                attempt,
617            };
618            if let Err(err) = maybe_fail(FaultPoint::StateRead, fault_ctx) {
619                return Err(StateError {
620                    code: "internal".into(),
621                    message: err.to_string(),
622                });
623            }
624        }
625        let key = StoreStateKey::from(key);
626        match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
627            Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
628            Ok(None) => Err(StateError {
629                code: "not_found".into(),
630                message: "state key not found".into(),
631            }),
632            Err(err) => Err(StateError {
633                code: "internal".into(),
634                message: err.to_string(),
635            }),
636        }
637    }
638
639    fn write(
640        &mut self,
641        key: HostStateKey,
642        bytes: Vec<u8>,
643        ctx: Option<StateTenantCtx>,
644    ) -> Result<StateOpAck, StateError> {
645        let store = match self.state_store.as_ref() {
646            Some(store) => store.clone(),
647            None => {
648                return Err(StateError {
649                    code: "unavailable".into(),
650                    message: "state store not configured".into(),
651                });
652            }
653        };
654        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
655            Ok(ctx) => ctx,
656            Err(err) => {
657                return Err(StateError {
658                    code: "invalid-ctx".into(),
659                    message: err.to_string(),
660                });
661            }
662        };
663        #[cfg(feature = "fault-injection")]
664        {
665            let exec_ctx = self.exec_ctx.as_ref();
666            let flow_id = exec_ctx
667                .map(|ctx| ctx.flow_id.as_str())
668                .unwrap_or("unknown");
669            let node_id = exec_ctx.and_then(|ctx| ctx.node_id.as_deref());
670            let attempt = exec_ctx.map(|ctx| ctx.tenant.attempt).unwrap_or(1);
671            let fault_ctx = FaultContext {
672                pack_id: self.pack_id.as_str(),
673                flow_id,
674                node_id,
675                attempt,
676            };
677            if let Err(err) = maybe_fail(FaultPoint::StateWrite, fault_ctx) {
678                return Err(StateError {
679                    code: "internal".into(),
680                    message: err.to_string(),
681                });
682            }
683        }
684        let key = StoreStateKey::from(key);
685        let value = serde_json::from_slice(&bytes)
686            .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
687        match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
688            Ok(()) => Ok(StateOpAck::Ok),
689            Err(err) => Err(StateError {
690                code: "internal".into(),
691                message: err.to_string(),
692            }),
693        }
694    }
695
696    fn delete(
697        &mut self,
698        key: HostStateKey,
699        ctx: Option<StateTenantCtx>,
700    ) -> Result<StateOpAck, StateError> {
701        let store = match self.state_store.as_ref() {
702            Some(store) => store.clone(),
703            None => {
704                return Err(StateError {
705                    code: "unavailable".into(),
706                    message: "state store not configured".into(),
707                });
708            }
709        };
710        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
711            Ok(ctx) => ctx,
712            Err(err) => {
713                return Err(StateError {
714                    code: "invalid-ctx".into(),
715                    message: err.to_string(),
716                });
717            }
718        };
719        let key = StoreStateKey::from(key);
720        match store.del(&tenant_ctx, STATE_PREFIX, &key) {
721            Ok(_) => Ok(StateOpAck::Ok),
722            Err(err) => Err(StateError {
723                code: "internal".into(),
724                message: err.to_string(),
725            }),
726        }
727    }
728}
729
730impl TelemetryLoggerHost for HostState {
731    fn log(
732        &mut self,
733        span: TelemetrySpanContext,
734        fields: Vec<(String, String)>,
735        _ctx: Option<TelemetryTenantCtx>,
736    ) -> Result<TelemetryAck, TelemetryError> {
737        if let Some(mock) = &self.mocks
738            && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
739        {
740            return Ok(TelemetryAck::Ok);
741        }
742        let mut map = serde_json::Map::new();
743        for (k, v) in fields {
744            map.insert(k, Value::String(v));
745        }
746        tracing::info!(
747            tenant = %span.tenant,
748            flow_id = %span.flow_id,
749            node = ?span.node_id,
750            provider = %span.provider,
751            fields = %serde_json::Value::Object(map.clone()),
752            "telemetry log from pack"
753        );
754        Ok(TelemetryAck::Ok)
755    }
756}
757
758impl RunnerHostHttp for HostState {
759    fn request(
760        &mut self,
761        method: String,
762        url: String,
763        headers: Vec<String>,
764        body: Option<Vec<u8>>,
765    ) -> Result<Vec<u8>, String> {
766        let req = HttpRequest {
767            method,
768            url,
769            headers: headers
770                .chunks(2)
771                .filter_map(|chunk| {
772                    if chunk.len() == 2 {
773                        Some((chunk[0].clone(), chunk[1].clone()))
774                    } else {
775                        None
776                    }
777                })
778                .collect(),
779            body,
780        };
781        match HttpClientHost::send(self, req, None) {
782            Ok(resp) => Ok(resp.body.unwrap_or_default()),
783            Err(err) => Err(err.message),
784        }
785    }
786}
787
788impl RunnerHostKv for HostState {
789    fn get(&mut self, _ns: String, _key: String) -> Option<String> {
790        None
791    }
792
793    fn put(&mut self, _ns: String, _key: String, _val: String) {}
794}
795
796enum ManifestLoad {
797    New {
798        manifest: Box<greentic_types::PackManifest>,
799        flows: PackFlows,
800    },
801    Legacy {
802        manifest: Box<legacy_pack::PackManifest>,
803        flows: PackFlows,
804    },
805}
806
807fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
808    let mut archive = ZipArchive::new(File::open(path)?)
809        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
810    let bytes = read_entry(&mut archive, "manifest.cbor")
811        .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
812    match decode_pack_manifest(&bytes) {
813        Ok(manifest) => {
814            let cache = PackFlows::from_manifest(manifest.clone());
815            Ok(ManifestLoad::New {
816                manifest: Box::new(manifest),
817                flows: cache,
818            })
819        }
820        Err(err) => {
821            tracing::debug!(error = %err, pack = %path.display(), "decode_pack_manifest failed; trying legacy manifest");
822            // Fall back to legacy pack manifest
823            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
824                .context("failed to decode legacy pack manifest from manifest.cbor")?;
825            let flows = load_legacy_flows(&mut archive, &legacy)?;
826            Ok(ManifestLoad::Legacy {
827                manifest: Box::new(legacy),
828                flows,
829            })
830        }
831    }
832}
833
834fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
835    let manifest_path = root.join("manifest.cbor");
836    let bytes = std::fs::read(&manifest_path)
837        .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
838    match decode_pack_manifest(&bytes) {
839        Ok(manifest) => {
840            let cache = PackFlows::from_manifest(manifest.clone());
841            Ok(ManifestLoad::New {
842                manifest: Box::new(manifest),
843                flows: cache,
844            })
845        }
846        Err(err) => {
847            tracing::debug!(
848                error = %err,
849                pack = %root.display(),
850                "decode_pack_manifest failed for materialized pack; trying legacy manifest"
851            );
852            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
853                .context("failed to decode legacy pack manifest from manifest.cbor")?;
854            let flows = load_legacy_flows_from_dir(root, &legacy)?;
855            Ok(ManifestLoad::Legacy {
856                manifest: Box::new(legacy),
857                flows,
858            })
859        }
860    }
861}
862
863fn load_legacy_flows(
864    archive: &mut ZipArchive<File>,
865    manifest: &legacy_pack::PackManifest,
866) -> Result<PackFlows> {
867    let mut flows = HashMap::new();
868    let mut descriptors = Vec::new();
869
870    for entry in &manifest.flows {
871        let bytes = read_entry(archive, &entry.file_json)
872            .with_context(|| format!("missing flow json {}", entry.file_json))?;
873        let doc: FlowDoc = serde_json::from_slice(&bytes)
874            .with_context(|| format!("failed to decode flow doc {}", entry.file_json))?;
875        let normalized = normalize_flow_doc(doc);
876        let flow_ir = flow_doc_to_ir(normalized)?;
877        let flow = flow_ir_to_flow(flow_ir)?;
878
879        descriptors.push(FlowDescriptor {
880            id: entry.id.clone(),
881            flow_type: entry.kind.clone(),
882            pack_id: manifest.meta.pack_id.clone(),
883            profile: manifest.meta.pack_id.clone(),
884            version: manifest.meta.version.to_string(),
885            description: None,
886        });
887        flows.insert(entry.id.clone(), flow);
888    }
889
890    let mut entry_flows = manifest.meta.entry_flows.clone();
891    if entry_flows.is_empty() {
892        entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
893    }
894    let metadata = PackMetadata {
895        pack_id: manifest.meta.pack_id.clone(),
896        version: manifest.meta.version.to_string(),
897        entry_flows,
898        secret_requirements: Vec::new(),
899    };
900
901    Ok(PackFlows {
902        descriptors,
903        flows,
904        metadata,
905    })
906}
907
908fn load_legacy_flows_from_dir(
909    root: &Path,
910    manifest: &legacy_pack::PackManifest,
911) -> Result<PackFlows> {
912    let mut flows = HashMap::new();
913    let mut descriptors = Vec::new();
914
915    for entry in &manifest.flows {
916        let path = root.join(&entry.file_json);
917        let bytes = std::fs::read(&path)
918            .with_context(|| format!("missing flow json {}", path.display()))?;
919        let doc: FlowDoc = serde_json::from_slice(&bytes)
920            .with_context(|| format!("failed to decode flow doc {}", path.display()))?;
921        let normalized = normalize_flow_doc(doc);
922        let flow_ir = flow_doc_to_ir(normalized)?;
923        let flow = flow_ir_to_flow(flow_ir)?;
924
925        descriptors.push(FlowDescriptor {
926            id: entry.id.clone(),
927            flow_type: entry.kind.clone(),
928            pack_id: manifest.meta.pack_id.clone(),
929            profile: manifest.meta.pack_id.clone(),
930            version: manifest.meta.version.to_string(),
931            description: None,
932        });
933        flows.insert(entry.id.clone(), flow);
934    }
935
936    let mut entry_flows = manifest.meta.entry_flows.clone();
937    if entry_flows.is_empty() {
938        entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
939    }
940    let metadata = PackMetadata {
941        pack_id: manifest.meta.pack_id.clone(),
942        version: manifest.meta.version.to_string(),
943        entry_flows,
944        secret_requirements: Vec::new(),
945    };
946
947    Ok(PackFlows {
948        descriptors,
949        flows,
950        metadata,
951    })
952}
953
954pub struct ComponentState {
955    pub host: HostState,
956    wasi_ctx: WasiCtx,
957    resource_table: ResourceTable,
958}
959
960impl ComponentState {
961    pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
962        let wasi_ctx = policy
963            .instantiate()
964            .context("failed to build WASI context")?;
965        Ok(Self {
966            host,
967            wasi_ctx,
968            resource_table: ResourceTable::new(),
969        })
970    }
971
972    fn host_mut(&mut self) -> &mut HostState {
973        &mut self.host
974    }
975
976    fn should_cancel_host(&mut self) -> bool {
977        false
978    }
979
980    fn yield_now_host(&mut self) {
981        // no-op cooperative yield
982    }
983}
984
985impl component_api::v0_4::greentic::component::control::Host for ComponentState {
986    fn should_cancel(&mut self) -> bool {
987        self.should_cancel_host()
988    }
989
990    fn yield_now(&mut self) {
991        self.yield_now_host();
992    }
993}
994
995impl component_api::v0_5::greentic::component::control::Host for ComponentState {
996    fn should_cancel(&mut self) -> bool {
997        self.should_cancel_host()
998    }
999
1000    fn yield_now(&mut self) {
1001        self.yield_now_host();
1002    }
1003}
1004
1005fn add_component_control_instance(
1006    linker: &mut Linker<ComponentState>,
1007    name: &str,
1008) -> wasmtime::Result<()> {
1009    let mut inst = linker.instance(name)?;
1010    inst.func_wrap(
1011        "should-cancel",
1012        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
1013            let host = caller.data_mut();
1014            Ok((host.should_cancel_host(),))
1015        },
1016    )?;
1017    inst.func_wrap(
1018        "yield-now",
1019        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
1020            let host = caller.data_mut();
1021            host.yield_now_host();
1022            Ok(())
1023        },
1024    )?;
1025    Ok(())
1026}
1027
1028fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
1029    add_component_control_instance(linker, "greentic:component/control@0.5.0")?;
1030    add_component_control_instance(linker, "greentic:component/control@0.4.0")?;
1031    Ok(())
1032}
1033
1034pub fn register_all(linker: &mut Linker<ComponentState>, allow_state_store: bool) -> Result<()> {
1035    add_wasi_to_linker(linker)?;
1036    add_all_v1_to_linker(
1037        linker,
1038        HostFns {
1039            http_client_v1_1: Some(|state: &mut ComponentState| state.host_mut()),
1040            http_client: Some(|state: &mut ComponentState| state.host_mut()),
1041            oauth_broker: None,
1042            runner_host_http: Some(|state: &mut ComponentState| state.host_mut()),
1043            runner_host_kv: Some(|state: &mut ComponentState| state.host_mut()),
1044            telemetry_logger: Some(|state: &mut ComponentState| state.host_mut()),
1045            state_store: allow_state_store.then_some(|state: &mut ComponentState| state.host_mut()),
1046            secrets_store_v1_1: Some(|state: &mut ComponentState| state.host_mut()),
1047            secrets_store: None,
1048        },
1049    )?;
1050    Ok(())
1051}
1052
1053impl OAuthHostContext for ComponentState {
1054    fn tenant_id(&self) -> &str {
1055        &self.host.config.tenant
1056    }
1057
1058    fn env(&self) -> &str {
1059        &self.host.default_env
1060    }
1061
1062    fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
1063        &mut self.host.oauth_host
1064    }
1065
1066    fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
1067        self.host.oauth_config.as_ref()
1068    }
1069}
1070
1071impl WasiView for ComponentState {
1072    fn ctx(&mut self) -> WasiCtxView<'_> {
1073        WasiCtxView {
1074            ctx: &mut self.wasi_ctx,
1075            table: &mut self.resource_table,
1076        }
1077    }
1078}
1079
1080#[allow(unsafe_code)]
1081unsafe impl Send for ComponentState {}
1082#[allow(unsafe_code)]
1083unsafe impl Sync for ComponentState {}
1084
1085impl PackRuntime {
1086    fn allows_state_store(&self, component_ref: &str) -> bool {
1087        if self.state_store.is_none() {
1088            return false;
1089        }
1090        if !self.config.state_store_policy.allow {
1091            return false;
1092        }
1093        let Some(manifest) = self.component_manifests.get(component_ref) else {
1094            return false;
1095        };
1096        manifest
1097            .capabilities
1098            .host
1099            .state
1100            .as_ref()
1101            .map(|caps| caps.read || caps.write)
1102            .unwrap_or(false)
1103    }
1104
1105    #[allow(clippy::too_many_arguments)]
1106    pub async fn load(
1107        path: impl AsRef<Path>,
1108        config: Arc<HostConfig>,
1109        mocks: Option<Arc<MockLayer>>,
1110        archive_source: Option<&Path>,
1111        session_store: Option<DynSessionStore>,
1112        state_store: Option<DynStateStore>,
1113        wasi_policy: Arc<RunnerWasiPolicy>,
1114        secrets: DynSecretsManager,
1115        oauth_config: Option<OAuthBrokerConfig>,
1116        verify_archive: bool,
1117        component_resolution: ComponentResolution,
1118    ) -> Result<Self> {
1119        let path = path.as_ref();
1120        let (_pack_root, safe_path) = normalize_pack_path(path)?;
1121        let path_meta = std::fs::metadata(&safe_path).ok();
1122        let is_dir = path_meta
1123            .as_ref()
1124            .map(|meta| meta.is_dir())
1125            .unwrap_or(false);
1126        let is_component = !is_dir
1127            && safe_path
1128                .extension()
1129                .and_then(|ext| ext.to_str())
1130                .map(|ext| ext.eq_ignore_ascii_case("wasm"))
1131                .unwrap_or(false);
1132        let archive_hint_path = if let Some(source) = archive_source {
1133            let (_, normalized) = normalize_pack_path(source)?;
1134            Some(normalized)
1135        } else if is_component || is_dir {
1136            None
1137        } else {
1138            Some(safe_path.clone())
1139        };
1140        let archive_hint = archive_hint_path.as_deref();
1141        if verify_archive {
1142            if let Some(verify_target) = archive_hint.and_then(|p| {
1143                std::fs::metadata(p)
1144                    .ok()
1145                    .filter(|meta| meta.is_file())
1146                    .map(|_| p)
1147            }) {
1148                verify::verify_pack(verify_target).await?;
1149                tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
1150            } else {
1151                tracing::debug!("skipping archive verification (no archive source)");
1152            }
1153        }
1154        let engine = Engine::default();
1155        let engine_profile =
1156            EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1157        let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1158        let mut metadata = PackMetadata::fallback(&safe_path);
1159        let mut manifest = None;
1160        let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
1161        let mut flows = None;
1162        let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
1163            if is_dir {
1164                Some(safe_path.clone())
1165            } else {
1166                None
1167            }
1168        });
1169
1170        if let Some(root) = materialized_root.as_ref() {
1171            match load_manifest_and_flows_from_dir(root) {
1172                Ok(ManifestLoad::New {
1173                    manifest: m,
1174                    flows: cache,
1175                }) => {
1176                    metadata = cache.metadata.clone();
1177                    manifest = Some(*m);
1178                    flows = Some(cache);
1179                }
1180                Ok(ManifestLoad::Legacy {
1181                    manifest: m,
1182                    flows: cache,
1183                }) => {
1184                    metadata = cache.metadata.clone();
1185                    legacy_manifest = Some(m);
1186                    flows = Some(cache);
1187                }
1188                Err(err) => {
1189                    warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
1190                }
1191            }
1192        }
1193
1194        if manifest.is_none()
1195            && legacy_manifest.is_none()
1196            && let Some(archive_path) = archive_hint
1197        {
1198            match load_manifest_and_flows(archive_path) {
1199                Ok(ManifestLoad::New {
1200                    manifest: m,
1201                    flows: cache,
1202                }) => {
1203                    metadata = cache.metadata.clone();
1204                    manifest = Some(*m);
1205                    flows = Some(cache);
1206                }
1207                Ok(ManifestLoad::Legacy {
1208                    manifest: m,
1209                    flows: cache,
1210                }) => {
1211                    metadata = cache.metadata.clone();
1212                    legacy_manifest = Some(m);
1213                    flows = Some(cache);
1214                }
1215                Err(err) => {
1216                    warn!(error = %err, pack = %archive_path.display(), "failed to parse pack manifest; skipping flows");
1217                }
1218            }
1219        }
1220        #[cfg(feature = "fault-injection")]
1221        {
1222            let fault_ctx = FaultContext {
1223                pack_id: metadata.pack_id.as_str(),
1224                flow_id: "unknown",
1225                node_id: None,
1226                attempt: 1,
1227            };
1228            maybe_fail(FaultPoint::PackResolve, fault_ctx)
1229                .map_err(|err| anyhow!(err.to_string()))?;
1230        }
1231        let mut pack_lock = None;
1232        for root in find_pack_lock_roots(&safe_path, is_dir, archive_hint) {
1233            pack_lock = load_pack_lock(&root)?;
1234            if pack_lock.is_some() {
1235                break;
1236            }
1237        }
1238        let component_sources_payload = if pack_lock.is_none() {
1239            if let Some(manifest) = manifest.as_ref() {
1240                manifest
1241                    .get_component_sources_v1()
1242                    .context("invalid component sources extension")?
1243            } else {
1244                None
1245            }
1246        } else {
1247            None
1248        };
1249        let component_sources = if let Some(lock) = pack_lock.as_ref() {
1250            Some(component_sources_table_from_pack_lock(
1251                lock,
1252                component_resolution.allow_missing_hash,
1253            )?)
1254        } else {
1255            component_sources_table(component_sources_payload.as_ref())?
1256        };
1257        let components = if is_component {
1258            let wasm_bytes = fs::read(&safe_path).await?;
1259            metadata = PackMetadata::from_wasm(&wasm_bytes)
1260                .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1261            let name = safe_path
1262                .file_stem()
1263                .map(|s| s.to_string_lossy().to_string())
1264                .unwrap_or_else(|| "component".to_string());
1265            let component = compile_component_with_cache(&cache, &engine, None, wasm_bytes).await?;
1266            let mut map = HashMap::new();
1267            map.insert(
1268                name.clone(),
1269                PackComponent {
1270                    name,
1271                    version: metadata.version.clone(),
1272                    component,
1273                },
1274            );
1275            map
1276        } else {
1277            let specs = component_specs(
1278                manifest.as_ref(),
1279                legacy_manifest.as_deref(),
1280                component_sources_payload.as_ref(),
1281                pack_lock.as_ref(),
1282            );
1283            if specs.is_empty() {
1284                HashMap::new()
1285            } else {
1286                let mut loaded = HashMap::new();
1287                let mut missing: HashSet<String> =
1288                    specs.iter().map(|spec| spec.id.clone()).collect();
1289                let mut searched = Vec::new();
1290
1291                if !component_resolution.overrides.is_empty() {
1292                    load_components_from_overrides(
1293                        &cache,
1294                        &engine,
1295                        &component_resolution.overrides,
1296                        &specs,
1297                        &mut missing,
1298                        &mut loaded,
1299                    )
1300                    .await?;
1301                    searched.push("override map".to_string());
1302                }
1303
1304                if let Some(component_sources) = component_sources.as_ref() {
1305                    load_components_from_sources(
1306                        &cache,
1307                        &engine,
1308                        component_sources,
1309                        &component_resolution,
1310                        &specs,
1311                        &mut missing,
1312                        &mut loaded,
1313                        materialized_root.as_deref(),
1314                        archive_hint,
1315                    )
1316                    .await?;
1317                    searched.push(format!("extension {}", EXT_COMPONENT_SOURCES_V1));
1318                }
1319
1320                if let Some(root) = materialized_root.as_ref() {
1321                    load_components_from_dir(
1322                        &cache,
1323                        &engine,
1324                        root,
1325                        &specs,
1326                        &mut missing,
1327                        &mut loaded,
1328                    )
1329                    .await?;
1330                    searched.push(format!("components dir {}", root.display()));
1331                }
1332
1333                if let Some(archive_path) = archive_hint {
1334                    load_components_from_archive(
1335                        &cache,
1336                        &engine,
1337                        archive_path,
1338                        &specs,
1339                        &mut missing,
1340                        &mut loaded,
1341                    )
1342                    .await?;
1343                    searched.push(format!("archive {}", archive_path.display()));
1344                }
1345
1346                if !missing.is_empty() {
1347                    let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
1348                    let sources = if searched.is_empty() {
1349                        "no component sources".to_string()
1350                    } else {
1351                        searched.join(", ")
1352                    };
1353                    bail!(
1354                        "components missing: {}; looked in {}",
1355                        missing_list,
1356                        sources
1357                    );
1358                }
1359
1360                loaded
1361            }
1362        };
1363        let http_client = Arc::clone(&HTTP_CLIENT);
1364        let mut component_manifests = HashMap::new();
1365        if let Some(manifest) = manifest.as_ref() {
1366            for component in &manifest.components {
1367                component_manifests.insert(component.id.as_str().to_string(), component.clone());
1368            }
1369        }
1370        Ok(Self {
1371            path: safe_path,
1372            archive_path: archive_hint.map(Path::to_path_buf),
1373            config,
1374            engine,
1375            metadata,
1376            manifest,
1377            legacy_manifest,
1378            component_manifests,
1379            mocks,
1380            flows,
1381            components,
1382            http_client,
1383            pre_cache: Mutex::new(HashMap::new()),
1384            session_store,
1385            state_store,
1386            wasi_policy,
1387            provider_registry: RwLock::new(None),
1388            secrets,
1389            oauth_config,
1390            cache,
1391        })
1392    }
1393
1394    pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
1395        if let Some(cache) = &self.flows {
1396            return Ok(cache.descriptors.clone());
1397        }
1398        if let Some(manifest) = &self.manifest {
1399            let descriptors = manifest
1400                .flows
1401                .iter()
1402                .map(|flow| FlowDescriptor {
1403                    id: flow.id.as_str().to_string(),
1404                    flow_type: flow_kind_to_str(flow.kind).to_string(),
1405                    pack_id: manifest.pack_id.as_str().to_string(),
1406                    profile: manifest.pack_id.as_str().to_string(),
1407                    version: manifest.version.to_string(),
1408                    description: None,
1409                })
1410                .collect();
1411            return Ok(descriptors);
1412        }
1413        Ok(Vec::new())
1414    }
1415
1416    #[allow(dead_code)]
1417    pub async fn run_flow(
1418        &self,
1419        flow_id: &str,
1420        input: serde_json::Value,
1421    ) -> Result<serde_json::Value> {
1422        let pack = Arc::new(
1423            PackRuntime::load(
1424                &self.path,
1425                Arc::clone(&self.config),
1426                self.mocks.clone(),
1427                self.archive_path.as_deref(),
1428                self.session_store.clone(),
1429                self.state_store.clone(),
1430                Arc::clone(&self.wasi_policy),
1431                self.secrets.clone(),
1432                self.oauth_config.clone(),
1433                false,
1434                ComponentResolution::default(),
1435            )
1436            .await?,
1437        );
1438
1439        let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1440        let retry_config = self.config.retry_config().into();
1441        let mocks = pack.mocks.as_deref();
1442        let tenant = self.config.tenant.as_str();
1443
1444        let ctx = FlowContext {
1445            tenant,
1446            pack_id: pack.metadata().pack_id.as_str(),
1447            flow_id,
1448            node_id: None,
1449            tool: None,
1450            action: None,
1451            session_id: None,
1452            provider_id: None,
1453            retry_config,
1454            attempt: 1,
1455            observer: None,
1456            mocks,
1457        };
1458
1459        let execution = engine.execute(ctx, input).await?;
1460        match execution.status {
1461            FlowStatus::Completed => Ok(execution.output),
1462            FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1463                "status": "pending",
1464                "reason": wait.reason,
1465                "resume": wait.snapshot,
1466                "response": execution.output,
1467            })),
1468        }
1469    }
1470
1471    pub async fn invoke_component(
1472        &self,
1473        component_ref: &str,
1474        ctx: ComponentExecCtx,
1475        operation: &str,
1476        _config_json: Option<String>,
1477        input_json: String,
1478    ) -> Result<Value> {
1479        let pack_component = self
1480            .components
1481            .get(component_ref)
1482            .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1483
1484        let mut linker = Linker::new(&self.engine);
1485        let allow_state_store = self.allows_state_store(component_ref);
1486        register_all(&mut linker, allow_state_store)?;
1487        add_component_control_to_linker(&mut linker)?;
1488
1489        let host_state = HostState::new(
1490            self.metadata().pack_id.clone(),
1491            Arc::clone(&self.config),
1492            Arc::clone(&self.http_client),
1493            self.mocks.clone(),
1494            self.session_store.clone(),
1495            self.state_store.clone(),
1496            Arc::clone(&self.secrets),
1497            self.oauth_config.clone(),
1498            Some(ctx.clone()),
1499            Some(component_ref.to_string()),
1500            false,
1501        )?;
1502        let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1503        let mut store = wasmtime::Store::new(&self.engine, store_state);
1504        let pre_instance = linker.instantiate_pre(pack_component.component.as_ref())?;
1505        let result = match component_api::v0_5::ComponentPre::new(pre_instance) {
1506            Ok(pre) => {
1507                let bindings = pre.instantiate_async(&mut store).await?;
1508                let node = bindings.greentic_component_node();
1509                let ctx_v05 = component_api::exec_ctx_v0_5(&ctx);
1510                let result = node.call_invoke(&mut store, &ctx_v05, operation, &input_json)?;
1511                component_api::invoke_result_from_v0_5(result)
1512            }
1513            Err(err) => {
1514                if is_missing_node_export(&err, "0.5.0") {
1515                    let pre_instance = linker.instantiate_pre(pack_component.component.as_ref())?;
1516                    let pre: component_api::v0_4::ComponentPre<ComponentState> =
1517                        component_api::v0_4::ComponentPre::new(pre_instance)?;
1518                    let bindings = pre.instantiate_async(&mut store).await?;
1519                    let node = bindings.greentic_component_node();
1520                    let ctx_v04 = component_api::exec_ctx_v0_4(&ctx);
1521                    let result = node.call_invoke(&mut store, &ctx_v04, operation, &input_json)?;
1522                    component_api::invoke_result_from_v0_4(result)
1523                } else {
1524                    return Err(err);
1525                }
1526            }
1527        };
1528
1529        match result {
1530            InvokeResult::Ok(body) => {
1531                if body.is_empty() {
1532                    return Ok(Value::Null);
1533                }
1534                serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
1535            }
1536            InvokeResult::Err(NodeError {
1537                code,
1538                message,
1539                retryable,
1540                backoff_ms,
1541                details,
1542            }) => {
1543                let mut obj = serde_json::Map::new();
1544                obj.insert("ok".into(), Value::Bool(false));
1545                let mut error = serde_json::Map::new();
1546                error.insert("code".into(), Value::String(code));
1547                error.insert("message".into(), Value::String(message));
1548                error.insert("retryable".into(), Value::Bool(retryable));
1549                if let Some(backoff) = backoff_ms {
1550                    error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1551                }
1552                if let Some(details) = details {
1553                    error.insert(
1554                        "details".into(),
1555                        serde_json::from_str(&details).unwrap_or(Value::String(details)),
1556                    );
1557                }
1558                obj.insert("error".into(), Value::Object(error));
1559                Ok(Value::Object(obj))
1560            }
1561        }
1562    }
1563
1564    pub fn resolve_provider(
1565        &self,
1566        provider_id: Option<&str>,
1567        provider_type: Option<&str>,
1568    ) -> Result<ProviderBinding> {
1569        let registry = self.provider_registry()?;
1570        registry.resolve(provider_id, provider_type)
1571    }
1572
1573    pub async fn invoke_provider(
1574        &self,
1575        binding: &ProviderBinding,
1576        ctx: ComponentExecCtx,
1577        op: &str,
1578        input_json: Vec<u8>,
1579    ) -> Result<Value> {
1580        let component_ref = &binding.component_ref;
1581        let pack_component = self
1582            .components
1583            .get(component_ref)
1584            .with_context(|| format!("provider component '{component_ref}' not found in pack"))?;
1585
1586        let mut linker = Linker::new(&self.engine);
1587        let allow_state_store = self.allows_state_store(component_ref);
1588        register_all(&mut linker, allow_state_store)?;
1589        add_component_control_to_linker(&mut linker)?;
1590        let pre_instance = linker.instantiate_pre(pack_component.component.as_ref())?;
1591        let pre: ProviderComponentPre<ComponentState> = ProviderComponentPre::new(pre_instance)?;
1592
1593        let host_state = HostState::new(
1594            self.metadata().pack_id.clone(),
1595            Arc::clone(&self.config),
1596            Arc::clone(&self.http_client),
1597            self.mocks.clone(),
1598            self.session_store.clone(),
1599            self.state_store.clone(),
1600            Arc::clone(&self.secrets),
1601            self.oauth_config.clone(),
1602            Some(ctx),
1603            Some(component_ref.to_string()),
1604            true,
1605        )?;
1606        let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1607        let mut store = wasmtime::Store::new(&self.engine, store_state);
1608        let bindings: crate::provider_core::SchemaCore = pre.instantiate_async(&mut store).await?;
1609        let provider = bindings.greentic_provider_core_schema_core_api();
1610
1611        let result = provider.call_invoke(&mut store, op, &input_json)?;
1612        deserialize_json_bytes(result)
1613    }
1614
1615    fn provider_registry(&self) -> Result<ProviderRegistry> {
1616        if let Some(registry) = self.provider_registry.read().clone() {
1617            return Ok(registry);
1618        }
1619        let manifest = self
1620            .manifest
1621            .as_ref()
1622            .context("pack manifest required for provider resolution")?;
1623        let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1624        let registry = ProviderRegistry::new(
1625            manifest,
1626            self.state_store.clone(),
1627            &self.config.tenant,
1628            &env,
1629        )?;
1630        *self.provider_registry.write() = Some(registry.clone());
1631        Ok(registry)
1632    }
1633
1634    pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1635        if let Some(cache) = &self.flows {
1636            return cache
1637                .flows
1638                .get(flow_id)
1639                .cloned()
1640                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1641        }
1642        if let Some(manifest) = &self.manifest {
1643            let entry = manifest
1644                .flows
1645                .iter()
1646                .find(|f| f.id.as_str() == flow_id)
1647                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1648            return Ok(entry.flow.clone());
1649        }
1650        bail!("flow '{flow_id}' not available (pack exports disabled)")
1651    }
1652
1653    pub fn metadata(&self) -> &PackMetadata {
1654        &self.metadata
1655    }
1656
1657    pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1658        &self.metadata.secret_requirements
1659    }
1660
1661    pub fn missing_secrets(
1662        &self,
1663        tenant_ctx: &TypesTenantCtx,
1664    ) -> Vec<greentic_types::SecretRequirement> {
1665        let env = tenant_ctx.env.as_str().to_string();
1666        let tenant = tenant_ctx.tenant.as_str().to_string();
1667        let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1668        self.required_secrets()
1669            .iter()
1670            .filter(|req| {
1671                // scope must match current context if provided
1672                if let Some(scope) = &req.scope {
1673                    if scope.env != env {
1674                        return false;
1675                    }
1676                    if scope.tenant != tenant {
1677                        return false;
1678                    }
1679                    if let Some(ref team_req) = scope.team
1680                        && team.as_ref() != Some(team_req)
1681                    {
1682                        return false;
1683                    }
1684                }
1685                let ctx = self.config.tenant_ctx();
1686                read_secret_blocking(&self.secrets, &ctx, req.key.as_str()).is_err()
1687            })
1688            .cloned()
1689            .collect()
1690    }
1691
1692    pub fn for_component_test(
1693        components: Vec<(String, PathBuf)>,
1694        flows: HashMap<String, FlowIR>,
1695        pack_id: &str,
1696        config: Arc<HostConfig>,
1697    ) -> Result<Self> {
1698        let engine = Engine::default();
1699        let engine_profile =
1700            EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1701        let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1702        let mut component_map = HashMap::new();
1703        for (name, path) in components {
1704            if !path.exists() {
1705                bail!("component artifact missing: {}", path.display());
1706            }
1707            let wasm_bytes = std::fs::read(&path)?;
1708            let component = Arc::new(
1709                Component::from_binary(&engine, &wasm_bytes)
1710                    .with_context(|| format!("failed to compile component {}", path.display()))?,
1711            );
1712            component_map.insert(
1713                name.clone(),
1714                PackComponent {
1715                    name,
1716                    version: "0.0.0".into(),
1717                    component,
1718                },
1719            );
1720        }
1721
1722        let mut flow_map = HashMap::new();
1723        let mut descriptors = Vec::new();
1724        for (id, ir) in flows {
1725            let flow_type = ir.flow_type.clone();
1726            let flow = flow_ir_to_flow(ir)?;
1727            flow_map.insert(id.clone(), flow);
1728            descriptors.push(FlowDescriptor {
1729                id: id.clone(),
1730                flow_type,
1731                pack_id: pack_id.to_string(),
1732                profile: "test".into(),
1733                version: "0.0.0".into(),
1734                description: None,
1735            });
1736        }
1737        let entry_flows = descriptors.iter().map(|flow| flow.id.clone()).collect();
1738        let metadata = PackMetadata {
1739            pack_id: pack_id.to_string(),
1740            version: "0.0.0".into(),
1741            entry_flows,
1742            secret_requirements: Vec::new(),
1743        };
1744        let flows_cache = PackFlows {
1745            descriptors: descriptors.clone(),
1746            flows: flow_map,
1747            metadata: metadata.clone(),
1748        };
1749
1750        Ok(Self {
1751            path: PathBuf::new(),
1752            archive_path: None,
1753            config,
1754            engine,
1755            metadata,
1756            manifest: None,
1757            legacy_manifest: None,
1758            component_manifests: HashMap::new(),
1759            mocks: None,
1760            flows: Some(flows_cache),
1761            components: component_map,
1762            http_client: Arc::clone(&HTTP_CLIENT),
1763            pre_cache: Mutex::new(HashMap::new()),
1764            session_store: None,
1765            state_store: None,
1766            wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1767            provider_registry: RwLock::new(None),
1768            secrets: crate::secrets::default_manager()?,
1769            oauth_config: None,
1770            cache,
1771        })
1772    }
1773}
1774
1775fn is_missing_node_export(err: &wasmtime::Error, version: &str) -> bool {
1776    let message = err.to_string();
1777    message.contains("no exported instance named")
1778        && message.contains(&format!("greentic:component/node@{version}"))
1779}
1780
1781struct PackFlows {
1782    descriptors: Vec<FlowDescriptor>,
1783    flows: HashMap<String, Flow>,
1784    metadata: PackMetadata,
1785}
1786
1787const RUNTIME_FLOW_EXTENSION_IDS: [&str; 3] = [
1788    "greentic.pack.runtime_flow",
1789    "greentic.pack.flow_runtime",
1790    "greentic.pack.runtime_flows",
1791];
1792
1793#[derive(Debug, Deserialize)]
1794struct RuntimeFlowBundle {
1795    flows: Vec<RuntimeFlow>,
1796}
1797
1798#[derive(Debug, Deserialize)]
1799struct RuntimeFlow {
1800    id: String,
1801    #[serde(alias = "flow_type")]
1802    kind: FlowKind,
1803    #[serde(default)]
1804    schema_version: Option<String>,
1805    #[serde(default)]
1806    start: Option<String>,
1807    #[serde(default)]
1808    entrypoints: BTreeMap<String, Value>,
1809    nodes: BTreeMap<String, RuntimeNode>,
1810    #[serde(default)]
1811    metadata: Option<FlowMetadata>,
1812}
1813
1814#[derive(Debug, Deserialize)]
1815struct RuntimeNode {
1816    #[serde(alias = "component")]
1817    component_id: String,
1818    #[serde(default, alias = "operation")]
1819    operation_name: Option<String>,
1820    #[serde(default, alias = "payload", alias = "input")]
1821    operation_payload: Value,
1822    #[serde(default)]
1823    routing: Option<Routing>,
1824    #[serde(default)]
1825    telemetry: Option<TelemetryHints>,
1826}
1827
1828fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
1829    if bytes.is_empty() {
1830        return Ok(Value::Null);
1831    }
1832    serde_json::from_slice(&bytes).or_else(|_| {
1833        String::from_utf8(bytes)
1834            .map(Value::String)
1835            .map_err(|err| anyhow!(err))
1836    })
1837}
1838
1839impl PackFlows {
1840    fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
1841        if let Some(flows) = flows_from_runtime_extension(&manifest) {
1842            return flows;
1843        }
1844        let descriptors = manifest
1845            .flows
1846            .iter()
1847            .map(|entry| FlowDescriptor {
1848                id: entry.id.as_str().to_string(),
1849                flow_type: flow_kind_to_str(entry.kind).to_string(),
1850                pack_id: manifest.pack_id.as_str().to_string(),
1851                profile: manifest.pack_id.as_str().to_string(),
1852                version: manifest.version.to_string(),
1853                description: None,
1854            })
1855            .collect();
1856        let mut flows = HashMap::new();
1857        for entry in &manifest.flows {
1858            flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
1859        }
1860        Self {
1861            metadata: PackMetadata::from_manifest(&manifest),
1862            descriptors,
1863            flows,
1864        }
1865    }
1866}
1867
1868fn flows_from_runtime_extension(manifest: &greentic_types::PackManifest) -> Option<PackFlows> {
1869    let extensions = manifest.extensions.as_ref()?;
1870    let extension = extensions.iter().find_map(|(key, ext)| {
1871        if RUNTIME_FLOW_EXTENSION_IDS
1872            .iter()
1873            .any(|candidate| candidate == key)
1874        {
1875            Some(ext)
1876        } else {
1877            None
1878        }
1879    })?;
1880    let runtime_flows = match decode_runtime_flow_extension(extension) {
1881        Some(flows) if !flows.is_empty() => flows,
1882        _ => return None,
1883    };
1884
1885    let descriptors = runtime_flows
1886        .iter()
1887        .map(|flow| FlowDescriptor {
1888            id: flow.id.as_str().to_string(),
1889            flow_type: flow_kind_to_str(flow.kind).to_string(),
1890            pack_id: manifest.pack_id.as_str().to_string(),
1891            profile: manifest.pack_id.as_str().to_string(),
1892            version: manifest.version.to_string(),
1893            description: None,
1894        })
1895        .collect::<Vec<_>>();
1896    let flows = runtime_flows
1897        .into_iter()
1898        .map(|flow| (flow.id.as_str().to_string(), flow))
1899        .collect();
1900
1901    Some(PackFlows {
1902        metadata: PackMetadata::from_manifest(manifest),
1903        descriptors,
1904        flows,
1905    })
1906}
1907
1908fn decode_runtime_flow_extension(extension: &ExtensionRef) -> Option<Vec<Flow>> {
1909    let value = match extension.inline.as_ref()? {
1910        ExtensionInline::Other(value) => value.clone(),
1911        _ => return None,
1912    };
1913
1914    if let Ok(bundle) = serde_json::from_value::<RuntimeFlowBundle>(value.clone()) {
1915        return Some(collect_runtime_flows(bundle.flows));
1916    }
1917
1918    if let Ok(flows) = serde_json::from_value::<Vec<RuntimeFlow>>(value.clone()) {
1919        return Some(collect_runtime_flows(flows));
1920    }
1921
1922    if let Ok(flows) = serde_json::from_value::<Vec<Flow>>(value) {
1923        return Some(flows);
1924    }
1925
1926    warn!(
1927        extension = %extension.kind,
1928        version = %extension.version,
1929        "runtime flow extension present but could not be decoded"
1930    );
1931    None
1932}
1933
1934fn collect_runtime_flows(flows: Vec<RuntimeFlow>) -> Vec<Flow> {
1935    flows
1936        .into_iter()
1937        .filter_map(|flow| match runtime_flow_to_flow(flow) {
1938            Ok(flow) => Some(flow),
1939            Err(err) => {
1940                warn!(error = %err, "failed to decode runtime flow");
1941                None
1942            }
1943        })
1944        .collect()
1945}
1946
1947fn runtime_flow_to_flow(runtime: RuntimeFlow) -> Result<Flow> {
1948    let flow_id = FlowId::from_str(&runtime.id)
1949        .with_context(|| format!("invalid flow id `{}`", runtime.id))?;
1950    let mut entrypoints = runtime.entrypoints;
1951    if entrypoints.is_empty()
1952        && let Some(start) = &runtime.start
1953    {
1954        entrypoints.insert("default".into(), Value::String(start.clone()));
1955    }
1956
1957    let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
1958    for (id, node) in runtime.nodes {
1959        let node_id = NodeId::from_str(&id).with_context(|| format!("invalid node id `{id}`"))?;
1960        let component_id = ComponentId::from_str(&node.component_id)
1961            .with_context(|| format!("invalid component id `{}`", node.component_id))?;
1962        let component = FlowComponentRef {
1963            id: component_id,
1964            pack_alias: None,
1965            operation: node.operation_name,
1966        };
1967        let routing = node.routing.unwrap_or(Routing::End);
1968        let telemetry = node.telemetry.unwrap_or_default();
1969        nodes.insert(
1970            node_id.clone(),
1971            Node {
1972                id: node_id,
1973                component,
1974                input: InputMapping {
1975                    mapping: node.operation_payload,
1976                },
1977                output: OutputMapping {
1978                    mapping: Value::Null,
1979                },
1980                routing,
1981                telemetry,
1982            },
1983        );
1984    }
1985
1986    Ok(Flow {
1987        schema_version: runtime.schema_version.unwrap_or_else(|| "1.0".to_string()),
1988        id: flow_id,
1989        kind: runtime.kind,
1990        entrypoints,
1991        nodes,
1992        metadata: runtime.metadata.unwrap_or_default(),
1993    })
1994}
1995
1996fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
1997    match kind {
1998        greentic_types::FlowKind::Messaging => "messaging",
1999        greentic_types::FlowKind::Event => "event",
2000        greentic_types::FlowKind::ComponentConfig => "component-config",
2001        greentic_types::FlowKind::Job => "job",
2002        greentic_types::FlowKind::Http => "http",
2003    }
2004}
2005
2006fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
2007    let mut file = archive
2008        .by_name(name)
2009        .with_context(|| format!("entry {name} missing from archive"))?;
2010    let mut buf = Vec::new();
2011    file.read_to_end(&mut buf)?;
2012    Ok(buf)
2013}
2014
2015fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
2016    for node in doc.nodes.values_mut() {
2017        let Some((component_ref, payload)) = node
2018            .raw
2019            .iter()
2020            .next()
2021            .map(|(key, value)| (key.clone(), value.clone()))
2022        else {
2023            continue;
2024        };
2025        if component_ref.starts_with("emit.") {
2026            node.operation = Some(component_ref);
2027            node.payload = payload;
2028            node.raw.clear();
2029            continue;
2030        }
2031        let (target_component, operation, input, config) =
2032            infer_component_exec(&payload, &component_ref);
2033        let mut payload_obj = serde_json::Map::new();
2034        // component.exec is meta; ensure the payload carries the actual target component.
2035        payload_obj.insert("component".into(), Value::String(target_component));
2036        payload_obj.insert("operation".into(), Value::String(operation));
2037        payload_obj.insert("input".into(), input);
2038        if let Some(cfg) = config {
2039            payload_obj.insert("config".into(), cfg);
2040        }
2041        node.operation = Some("component.exec".to_string());
2042        node.payload = Value::Object(payload_obj);
2043        node.raw.clear();
2044    }
2045    doc
2046}
2047
2048fn infer_component_exec(
2049    payload: &Value,
2050    component_ref: &str,
2051) -> (String, String, Value, Option<Value>) {
2052    let default_op = if component_ref.starts_with("templating.") {
2053        "render"
2054    } else {
2055        "invoke"
2056    }
2057    .to_string();
2058
2059    if let Value::Object(map) = payload {
2060        let op = map
2061            .get("op")
2062            .or_else(|| map.get("operation"))
2063            .and_then(Value::as_str)
2064            .map(|s| s.to_string())
2065            .unwrap_or_else(|| default_op.clone());
2066
2067        let mut input = map.clone();
2068        let config = input.remove("config");
2069        let component = input
2070            .get("component")
2071            .or_else(|| input.get("component_ref"))
2072            .and_then(Value::as_str)
2073            .map(|s| s.to_string())
2074            .unwrap_or_else(|| component_ref.to_string());
2075        input.remove("component");
2076        input.remove("component_ref");
2077        input.remove("op");
2078        input.remove("operation");
2079        return (component, op, Value::Object(input), config);
2080    }
2081
2082    (component_ref.to_string(), default_op, payload.clone(), None)
2083}
2084
2085#[derive(Clone, Debug)]
2086struct ComponentSpec {
2087    id: String,
2088    version: String,
2089    legacy_path: Option<String>,
2090}
2091
2092#[derive(Clone, Debug)]
2093struct ComponentSourceInfo {
2094    digest: Option<String>,
2095    source: ComponentSourceRef,
2096    artifact: ComponentArtifactLocation,
2097    expected_wasm_sha256: Option<String>,
2098    skip_digest_verification: bool,
2099}
2100
2101#[derive(Clone, Debug)]
2102enum ComponentArtifactLocation {
2103    Inline { wasm_path: String },
2104    Remote,
2105}
2106
2107#[derive(Clone, Debug, Deserialize)]
2108struct PackLockV1 {
2109    schema_version: u32,
2110    components: Vec<PackLockComponent>,
2111}
2112
2113#[derive(Clone, Debug, Deserialize)]
2114struct PackLockComponent {
2115    name: String,
2116    #[serde(default, rename = "source_ref")]
2117    source_ref: Option<String>,
2118    #[serde(default, rename = "ref")]
2119    legacy_ref: Option<String>,
2120    #[serde(default)]
2121    component_id: Option<ComponentId>,
2122    #[serde(default)]
2123    bundled: Option<bool>,
2124    #[serde(default, rename = "bundled_path")]
2125    bundled_path: Option<String>,
2126    #[serde(default, rename = "path")]
2127    legacy_path: Option<String>,
2128    #[serde(default)]
2129    wasm_sha256: Option<String>,
2130    #[serde(default, rename = "sha256")]
2131    legacy_sha256: Option<String>,
2132    #[serde(default)]
2133    resolved_digest: Option<String>,
2134    #[serde(default)]
2135    digest: Option<String>,
2136}
2137
2138fn component_specs(
2139    manifest: Option<&greentic_types::PackManifest>,
2140    legacy_manifest: Option<&legacy_pack::PackManifest>,
2141    component_sources: Option<&ComponentSourcesV1>,
2142    pack_lock: Option<&PackLockV1>,
2143) -> Vec<ComponentSpec> {
2144    if let Some(manifest) = manifest {
2145        if !manifest.components.is_empty() {
2146            return manifest
2147                .components
2148                .iter()
2149                .map(|entry| ComponentSpec {
2150                    id: entry.id.as_str().to_string(),
2151                    version: entry.version.to_string(),
2152                    legacy_path: None,
2153                })
2154                .collect();
2155        }
2156        if let Some(lock) = pack_lock {
2157            let mut seen = HashSet::new();
2158            let mut specs = Vec::new();
2159            for entry in &lock.components {
2160                let id = entry
2161                    .component_id
2162                    .as_ref()
2163                    .map(|id| id.as_str())
2164                    .unwrap_or(entry.name.as_str());
2165                if seen.insert(id.to_string()) {
2166                    specs.push(ComponentSpec {
2167                        id: id.to_string(),
2168                        version: "0.0.0".to_string(),
2169                        legacy_path: None,
2170                    });
2171                }
2172            }
2173            return specs;
2174        }
2175        if let Some(sources) = component_sources {
2176            let mut seen = HashSet::new();
2177            let mut specs = Vec::new();
2178            for entry in &sources.components {
2179                let id = entry
2180                    .component_id
2181                    .as_ref()
2182                    .map(|id| id.as_str())
2183                    .unwrap_or(entry.name.as_str());
2184                if seen.insert(id.to_string()) {
2185                    specs.push(ComponentSpec {
2186                        id: id.to_string(),
2187                        version: "0.0.0".to_string(),
2188                        legacy_path: None,
2189                    });
2190                }
2191            }
2192            return specs;
2193        }
2194    }
2195    if let Some(legacy_manifest) = legacy_manifest {
2196        return legacy_manifest
2197            .components
2198            .iter()
2199            .map(|entry| ComponentSpec {
2200                id: entry.name.clone(),
2201                version: entry.version.to_string(),
2202                legacy_path: Some(entry.file_wasm.clone()),
2203            })
2204            .collect();
2205    }
2206    Vec::new()
2207}
2208
2209fn component_sources_table(
2210    sources: Option<&ComponentSourcesV1>,
2211) -> Result<Option<HashMap<String, ComponentSourceInfo>>> {
2212    let Some(sources) = sources else {
2213        return Ok(None);
2214    };
2215    let mut table = HashMap::new();
2216    for entry in &sources.components {
2217        let artifact = match &entry.artifact {
2218            ArtifactLocationV1::Inline { wasm_path, .. } => ComponentArtifactLocation::Inline {
2219                wasm_path: wasm_path.clone(),
2220            },
2221            ArtifactLocationV1::Remote => ComponentArtifactLocation::Remote,
2222        };
2223        let info = ComponentSourceInfo {
2224            digest: Some(entry.resolved.digest.clone()),
2225            source: entry.source.clone(),
2226            artifact,
2227            expected_wasm_sha256: None,
2228            skip_digest_verification: false,
2229        };
2230        if let Some(component_id) = entry.component_id.as_ref() {
2231            table.insert(component_id.as_str().to_string(), info.clone());
2232        }
2233        table.insert(entry.name.clone(), info);
2234    }
2235    Ok(Some(table))
2236}
2237
2238fn load_pack_lock(path: &Path) -> Result<Option<PackLockV1>> {
2239    let lock_path = if path.is_dir() {
2240        let candidate = path.join("pack.lock");
2241        if candidate.exists() {
2242            Some(candidate)
2243        } else {
2244            let candidate = path.join("pack.lock.json");
2245            candidate.exists().then_some(candidate)
2246        }
2247    } else {
2248        None
2249    };
2250    let Some(lock_path) = lock_path else {
2251        return Ok(None);
2252    };
2253    let raw = std::fs::read_to_string(&lock_path)
2254        .with_context(|| format!("failed to read {}", lock_path.display()))?;
2255    let lock: PackLockV1 = serde_json::from_str(&raw).context("failed to parse pack.lock")?;
2256    if lock.schema_version != 1 {
2257        bail!("pack.lock schema_version must be 1");
2258    }
2259    Ok(Some(lock))
2260}
2261
2262fn find_pack_lock_roots(
2263    pack_path: &Path,
2264    is_dir: bool,
2265    archive_hint: Option<&Path>,
2266) -> Vec<PathBuf> {
2267    if is_dir {
2268        return vec![pack_path.to_path_buf()];
2269    }
2270    let mut roots = Vec::new();
2271    if let Some(archive_path) = archive_hint {
2272        if let Some(parent) = archive_path.parent() {
2273            roots.push(parent.to_path_buf());
2274            if let Some(grandparent) = parent.parent() {
2275                roots.push(grandparent.to_path_buf());
2276            }
2277        }
2278    } else if let Some(parent) = pack_path.parent() {
2279        roots.push(parent.to_path_buf());
2280        if let Some(grandparent) = parent.parent() {
2281            roots.push(grandparent.to_path_buf());
2282        }
2283    }
2284    roots
2285}
2286
2287fn normalize_sha256(digest: &str) -> Result<String> {
2288    let trimmed = digest.trim();
2289    if trimmed.is_empty() {
2290        bail!("sha256 digest cannot be empty");
2291    }
2292    if let Some(stripped) = trimmed.strip_prefix("sha256:") {
2293        if stripped.is_empty() {
2294            bail!("sha256 digest must include hex bytes after sha256:");
2295        }
2296        return Ok(trimmed.to_string());
2297    }
2298    if trimmed.chars().all(|c| c.is_ascii_hexdigit()) {
2299        return Ok(format!("sha256:{trimmed}"));
2300    }
2301    bail!("sha256 digest must be hex or sha256:<hex>");
2302}
2303
2304fn component_sources_table_from_pack_lock(
2305    lock: &PackLockV1,
2306    allow_missing_hash: bool,
2307) -> Result<HashMap<String, ComponentSourceInfo>> {
2308    let mut table = HashMap::new();
2309    let mut names = HashSet::new();
2310    for entry in &lock.components {
2311        if !names.insert(entry.name.clone()) {
2312            bail!(
2313                "pack.lock contains duplicate component name `{}`",
2314                entry.name
2315            );
2316        }
2317        let source_ref = match (&entry.source_ref, &entry.legacy_ref) {
2318            (Some(primary), Some(legacy)) => {
2319                if primary != legacy {
2320                    bail!(
2321                        "pack.lock component {} has conflicting refs: {} vs {}",
2322                        entry.name,
2323                        primary,
2324                        legacy
2325                    );
2326                }
2327                primary.as_str()
2328            }
2329            (Some(primary), None) => primary.as_str(),
2330            (None, Some(legacy)) => legacy.as_str(),
2331            (None, None) => {
2332                bail!("pack.lock component {} missing source_ref", entry.name);
2333            }
2334        };
2335        let source: ComponentSourceRef = source_ref
2336            .parse()
2337            .with_context(|| format!("invalid component ref `{}`", source_ref))?;
2338        let bundled_path = match (&entry.bundled_path, &entry.legacy_path) {
2339            (Some(primary), Some(legacy)) => {
2340                if primary != legacy {
2341                    bail!(
2342                        "pack.lock component {} has conflicting bundled paths: {} vs {}",
2343                        entry.name,
2344                        primary,
2345                        legacy
2346                    );
2347                }
2348                Some(primary.clone())
2349            }
2350            (Some(primary), None) => Some(primary.clone()),
2351            (None, Some(legacy)) => Some(legacy.clone()),
2352            (None, None) => None,
2353        };
2354        let bundled = entry.bundled.unwrap_or(false) || bundled_path.is_some();
2355        let (artifact, digest, expected_wasm_sha256, skip_digest_verification) = if bundled {
2356            let wasm_path = bundled_path.ok_or_else(|| {
2357                anyhow!(
2358                    "pack.lock component {} marked bundled but bundled_path is missing",
2359                    entry.name
2360                )
2361            })?;
2362            let expected_raw = match (&entry.wasm_sha256, &entry.legacy_sha256) {
2363                (Some(primary), Some(legacy)) => {
2364                    if primary != legacy {
2365                        bail!(
2366                            "pack.lock component {} has conflicting wasm_sha256 values: {} vs {}",
2367                            entry.name,
2368                            primary,
2369                            legacy
2370                        );
2371                    }
2372                    Some(primary.as_str())
2373                }
2374                (Some(primary), None) => Some(primary.as_str()),
2375                (None, Some(legacy)) => Some(legacy.as_str()),
2376                (None, None) => None,
2377            };
2378            let expected = match expected_raw {
2379                Some(value) => Some(normalize_sha256(value)?),
2380                None => None,
2381            };
2382            if expected.is_none() && !allow_missing_hash {
2383                bail!(
2384                    "pack.lock component {} missing wasm_sha256 for bundled component",
2385                    entry.name
2386                );
2387            }
2388            (
2389                ComponentArtifactLocation::Inline { wasm_path },
2390                expected.clone(),
2391                expected,
2392                allow_missing_hash && expected_raw.is_none(),
2393            )
2394        } else {
2395            if source.is_tag() {
2396                bail!(
2397                    "component {} uses tag ref {} but is not bundled; rebuild the pack",
2398                    entry.name,
2399                    source
2400                );
2401            }
2402            let expected = entry
2403                .resolved_digest
2404                .as_deref()
2405                .or(entry.digest.as_deref())
2406                .ok_or_else(|| {
2407                    anyhow!(
2408                        "pack.lock component {} missing resolved_digest for remote component",
2409                        entry.name
2410                    )
2411                })?;
2412            (
2413                ComponentArtifactLocation::Remote,
2414                Some(normalize_digest(expected)),
2415                None,
2416                false,
2417            )
2418        };
2419        let info = ComponentSourceInfo {
2420            digest,
2421            source,
2422            artifact,
2423            expected_wasm_sha256,
2424            skip_digest_verification,
2425        };
2426        if let Some(component_id) = entry.component_id.as_ref() {
2427            let key = component_id.as_str().to_string();
2428            if table.contains_key(&key) {
2429                bail!(
2430                    "pack.lock contains duplicate component id `{}`",
2431                    component_id.as_str()
2432                );
2433            }
2434            table.insert(key, info.clone());
2435        }
2436        if entry.name
2437            != entry
2438                .component_id
2439                .as_ref()
2440                .map(|id| id.as_str())
2441                .unwrap_or("")
2442        {
2443            table.insert(entry.name.clone(), info);
2444        }
2445    }
2446    Ok(table)
2447}
2448
2449fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
2450    if let Some(path) = &spec.legacy_path {
2451        return root.join(path);
2452    }
2453    root.join("components").join(format!("{}.wasm", spec.id))
2454}
2455
2456fn normalize_digest(digest: &str) -> String {
2457    if digest.starts_with("sha256:") || digest.starts_with("blake3:") {
2458        digest.to_string()
2459    } else {
2460        format!("sha256:{digest}")
2461    }
2462}
2463
2464fn compute_digest_for(bytes: &[u8], digest: &str) -> Result<String> {
2465    if digest.starts_with("blake3:") {
2466        let hash = blake3::hash(bytes);
2467        return Ok(format!("blake3:{}", hash.to_hex()));
2468    }
2469    let mut hasher = sha2::Sha256::new();
2470    hasher.update(bytes);
2471    Ok(format!("sha256:{:x}", hasher.finalize()))
2472}
2473
2474fn compute_sha256_digest_for(bytes: &[u8]) -> String {
2475    let mut hasher = sha2::Sha256::new();
2476    hasher.update(bytes);
2477    format!("sha256:{:x}", hasher.finalize())
2478}
2479
2480fn build_artifact_key(cache: &CacheManager, digest: Option<&str>, bytes: &[u8]) -> ArtifactKey {
2481    let wasm_digest = digest
2482        .map(normalize_digest)
2483        .unwrap_or_else(|| compute_sha256_digest_for(bytes));
2484    ArtifactKey::new(cache.engine_profile_id().to_string(), wasm_digest)
2485}
2486
2487async fn compile_component_with_cache(
2488    cache: &CacheManager,
2489    engine: &Engine,
2490    digest: Option<&str>,
2491    bytes: Vec<u8>,
2492) -> Result<Arc<Component>> {
2493    let key = build_artifact_key(cache, digest, &bytes);
2494    cache.get_component(engine, &key, || Ok(bytes)).await
2495}
2496
2497fn verify_component_digest(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2498    let normalized_expected = normalize_digest(expected);
2499    let actual = compute_digest_for(bytes, &normalized_expected)?;
2500    if normalize_digest(&actual) != normalized_expected {
2501        bail!(
2502            "component {component_id} digest mismatch: expected {normalized_expected}, got {actual}"
2503        );
2504    }
2505    Ok(())
2506}
2507
2508fn verify_wasm_sha256(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2509    let normalized_expected = normalize_sha256(expected)?;
2510    let actual = compute_sha256_digest_for(bytes);
2511    if actual != normalized_expected {
2512        bail!(
2513            "component {component_id} bundled digest mismatch: expected {normalized_expected}, got {actual}"
2514        );
2515    }
2516    Ok(())
2517}
2518
2519#[cfg(test)]
2520mod pack_lock_tests {
2521    use super::*;
2522    use tempfile::TempDir;
2523
2524    #[test]
2525    fn pack_lock_tag_ref_requires_bundle() {
2526        let lock = PackLockV1 {
2527            schema_version: 1,
2528            components: vec![PackLockComponent {
2529                name: "templates".to_string(),
2530                source_ref: Some("oci://registry.test/templates:latest".to_string()),
2531                legacy_ref: None,
2532                component_id: None,
2533                bundled: Some(false),
2534                bundled_path: None,
2535                legacy_path: None,
2536                wasm_sha256: None,
2537                legacy_sha256: None,
2538                resolved_digest: None,
2539                digest: None,
2540            }],
2541        };
2542        let err = component_sources_table_from_pack_lock(&lock, false).unwrap_err();
2543        assert!(
2544            err.to_string().contains("tag ref") && err.to_string().contains("rebuild the pack"),
2545            "unexpected error: {err}"
2546        );
2547    }
2548
2549    #[test]
2550    fn bundled_hash_mismatch_errors() {
2551        let rt = tokio::runtime::Runtime::new().expect("runtime");
2552        let temp = TempDir::new().expect("temp dir");
2553        let engine = Engine::default();
2554        let engine_profile =
2555            EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
2556        let cache_config = CacheConfig {
2557            root: temp.path().join("cache"),
2558            ..CacheConfig::default()
2559        };
2560        let cache = CacheManager::new(cache_config, engine_profile);
2561        let wasm_path = temp.path().join("component.wasm");
2562        let fixture_wasm = Path::new(env!("CARGO_MANIFEST_DIR"))
2563            .join("../../tests/fixtures/packs/secrets_store_smoke/components/echo_secret.wasm");
2564        let bytes = std::fs::read(&fixture_wasm).expect("read fixture wasm");
2565        std::fs::write(&wasm_path, &bytes).expect("write temp wasm");
2566
2567        let spec = ComponentSpec {
2568            id: "qa.process".to_string(),
2569            version: "0.0.0".to_string(),
2570            legacy_path: None,
2571        };
2572        let mut missing = HashSet::new();
2573        missing.insert(spec.id.clone());
2574
2575        let mut sources = HashMap::new();
2576        sources.insert(
2577            spec.id.clone(),
2578            ComponentSourceInfo {
2579                digest: Some("sha256:deadbeef".to_string()),
2580                source: ComponentSourceRef::Oci("registry.test/qa.process@sha256:deadbeef".into()),
2581                artifact: ComponentArtifactLocation::Inline {
2582                    wasm_path: "component.wasm".to_string(),
2583                },
2584                expected_wasm_sha256: Some("sha256:deadbeef".to_string()),
2585                skip_digest_verification: false,
2586            },
2587        );
2588
2589        let mut loaded = HashMap::new();
2590        let result = rt.block_on(load_components_from_sources(
2591            &cache,
2592            &engine,
2593            &sources,
2594            &ComponentResolution::default(),
2595            &[spec],
2596            &mut missing,
2597            &mut loaded,
2598            Some(temp.path()),
2599            None,
2600        ));
2601        let err = result.unwrap_err();
2602        assert!(
2603            err.to_string().contains("bundled digest mismatch"),
2604            "unexpected error: {err}"
2605        );
2606    }
2607}
2608
2609#[cfg(test)]
2610mod pack_resolution_prop_tests {
2611    use super::*;
2612    use greentic_types::{ArtifactLocationV1, ComponentSourceEntryV1, ResolvedComponentV1};
2613    use proptest::prelude::*;
2614    use proptest::test_runner::{Config as ProptestConfig, RngAlgorithm, TestRng, TestRunner};
2615    use std::collections::BTreeSet;
2616    use std::path::Path;
2617    use std::str::FromStr;
2618
2619    #[derive(Clone, Debug)]
2620    enum ResolveRequest {
2621        ById(String),
2622        ByName(String),
2623    }
2624
2625    #[derive(Clone, Debug, PartialEq, Eq)]
2626    struct ResolvedComponent {
2627        key: String,
2628        source: String,
2629        artifact: String,
2630        digest: Option<String>,
2631        expected_wasm_sha256: Option<String>,
2632        skip_digest_verification: bool,
2633    }
2634
2635    #[derive(Clone, Debug, PartialEq, Eq)]
2636    struct ResolveError {
2637        code: String,
2638        message: String,
2639        context_key: String,
2640    }
2641
2642    #[derive(Clone, Debug)]
2643    struct Scenario {
2644        pack_lock: Option<PackLockV1>,
2645        component_sources: Option<ComponentSourcesV1>,
2646        request: ResolveRequest,
2647        expected_sha256: Option<String>,
2648        bytes: Vec<u8>,
2649    }
2650
2651    fn resolve_component_test(
2652        sources: Option<&ComponentSourcesV1>,
2653        lock: Option<&PackLockV1>,
2654        request: &ResolveRequest,
2655    ) -> Result<ResolvedComponent, ResolveError> {
2656        let table = if let Some(lock) = lock {
2657            component_sources_table_from_pack_lock(lock, false).map_err(|err| ResolveError {
2658                code: classify_pack_lock_error(err.to_string().as_str()).to_string(),
2659                message: err.to_string(),
2660                context_key: request_key(request).to_string(),
2661            })?
2662        } else {
2663            let sources = component_sources_table(sources).map_err(|err| ResolveError {
2664                code: "component_sources_error".to_string(),
2665                message: err.to_string(),
2666                context_key: request_key(request).to_string(),
2667            })?;
2668            sources.ok_or_else(|| ResolveError {
2669                code: "missing_component_sources".to_string(),
2670                message: "component sources not provided".to_string(),
2671                context_key: request_key(request).to_string(),
2672            })?
2673        };
2674
2675        let key = request_key(request);
2676        let source = table.get(key).ok_or_else(|| ResolveError {
2677            code: "component_not_found".to_string(),
2678            message: format!("component {key} not found"),
2679            context_key: key.to_string(),
2680        })?;
2681
2682        Ok(ResolvedComponent {
2683            key: key.to_string(),
2684            source: source.source.to_string(),
2685            artifact: match source.artifact {
2686                ComponentArtifactLocation::Inline { .. } => "inline".to_string(),
2687                ComponentArtifactLocation::Remote => "remote".to_string(),
2688            },
2689            digest: source.digest.clone(),
2690            expected_wasm_sha256: source.expected_wasm_sha256.clone(),
2691            skip_digest_verification: source.skip_digest_verification,
2692        })
2693    }
2694
2695    fn request_key(request: &ResolveRequest) -> &str {
2696        match request {
2697            ResolveRequest::ById(value) => value.as_str(),
2698            ResolveRequest::ByName(value) => value.as_str(),
2699        }
2700    }
2701
2702    fn classify_pack_lock_error(message: &str) -> &'static str {
2703        if message.contains("duplicate component name") {
2704            "duplicate_name"
2705        } else if message.contains("duplicate component id") {
2706            "duplicate_id"
2707        } else if message.contains("conflicting refs") {
2708            "conflicting_ref"
2709        } else if message.contains("conflicting bundled paths") {
2710            "conflicting_bundled_path"
2711        } else if message.contains("conflicting wasm_sha256") {
2712            "conflicting_wasm_sha256"
2713        } else if message.contains("missing source_ref") {
2714            "missing_source_ref"
2715        } else if message.contains("marked bundled but bundled_path is missing") {
2716            "missing_bundled_path"
2717        } else if message.contains("missing wasm_sha256") {
2718            "missing_wasm_sha256"
2719        } else if message.contains("tag ref") && message.contains("not bundled") {
2720            "tag_ref_requires_bundle"
2721        } else if message.contains("missing resolved_digest") {
2722            "missing_resolved_digest"
2723        } else if message.contains("invalid component ref") {
2724            "invalid_component_ref"
2725        } else if message.contains("sha256 digest") {
2726            "invalid_sha256"
2727        } else {
2728            "unknown_error"
2729        }
2730    }
2731
2732    fn known_error_codes() -> BTreeSet<&'static str> {
2733        [
2734            "component_sources_error",
2735            "missing_component_sources",
2736            "component_not_found",
2737            "duplicate_name",
2738            "duplicate_id",
2739            "conflicting_ref",
2740            "conflicting_bundled_path",
2741            "conflicting_wasm_sha256",
2742            "missing_source_ref",
2743            "missing_bundled_path",
2744            "missing_wasm_sha256",
2745            "tag_ref_requires_bundle",
2746            "missing_resolved_digest",
2747            "invalid_component_ref",
2748            "invalid_sha256",
2749            "unknown_error",
2750        ]
2751        .into_iter()
2752        .collect()
2753    }
2754
2755    fn proptest_config() -> ProptestConfig {
2756        let cases = std::env::var("PROPTEST_CASES")
2757            .ok()
2758            .and_then(|value| value.parse::<u32>().ok())
2759            .unwrap_or(128);
2760        ProptestConfig {
2761            cases,
2762            failure_persistence: None,
2763            ..ProptestConfig::default()
2764        }
2765    }
2766
2767    fn proptest_seed() -> Option<[u8; 32]> {
2768        let seed = std::env::var("PROPTEST_SEED")
2769            .ok()
2770            .and_then(|value| value.parse::<u64>().ok())?;
2771        let mut bytes = [0u8; 32];
2772        bytes[..8].copy_from_slice(&seed.to_le_bytes());
2773        Some(bytes)
2774    }
2775
2776    fn run_cases(strategy: impl Strategy<Value = Scenario>, cases: u32, seed: Option<[u8; 32]>) {
2777        let config = ProptestConfig {
2778            cases,
2779            failure_persistence: None,
2780            ..ProptestConfig::default()
2781        };
2782        let mut runner = match seed {
2783            Some(bytes) => {
2784                TestRunner::new_with_rng(config, TestRng::from_seed(RngAlgorithm::ChaCha, &bytes))
2785            }
2786            None => TestRunner::new(config),
2787        };
2788        runner
2789            .run(&strategy, |scenario| {
2790                run_scenario(&scenario);
2791                Ok(())
2792            })
2793            .unwrap();
2794    }
2795
2796    fn run_scenario(scenario: &Scenario) {
2797        let known_codes = known_error_codes();
2798        let first = resolve_component_test(
2799            scenario.component_sources.as_ref(),
2800            scenario.pack_lock.as_ref(),
2801            &scenario.request,
2802        );
2803        let second = resolve_component_test(
2804            scenario.component_sources.as_ref(),
2805            scenario.pack_lock.as_ref(),
2806            &scenario.request,
2807        );
2808        assert_eq!(normalize_result(&first), normalize_result(&second));
2809
2810        if let Some(lock) = scenario.pack_lock.as_ref() {
2811            let lock_only = resolve_component_test(None, Some(lock), &scenario.request);
2812            assert_eq!(normalize_result(&first), normalize_result(&lock_only));
2813        }
2814
2815        if let Err(err) = first.as_ref() {
2816            assert!(
2817                known_codes.contains(err.code.as_str()),
2818                "unexpected error code {}: {}",
2819                err.code,
2820                err.message
2821            );
2822        }
2823
2824        if let Some(expected) = scenario.expected_sha256.as_deref() {
2825            let expected_ok =
2826                verify_wasm_sha256("test.component", expected, &scenario.bytes).is_ok();
2827            let actual = compute_sha256_digest_for(&scenario.bytes);
2828            if actual == normalize_sha256(expected).unwrap_or_default() {
2829                assert!(expected_ok, "expected sha256 match to succeed");
2830            } else {
2831                assert!(!expected_ok, "expected sha256 mismatch to fail");
2832            }
2833        }
2834    }
2835
2836    fn normalize_result(
2837        result: &Result<ResolvedComponent, ResolveError>,
2838    ) -> Result<ResolvedComponent, ResolveError> {
2839        match result {
2840            Ok(value) => Ok(value.clone()),
2841            Err(err) => Err(err.clone()),
2842        }
2843    }
2844
2845    fn scenario_strategy() -> impl Strategy<Value = Scenario> {
2846        let name = any::<u8>().prop_map(|n| format!("component{n}.core"));
2847        let alt_name = any::<u8>().prop_map(|n| format!("component_alt{n}.core"));
2848        let tag_ref = any::<bool>();
2849        let bundled = any::<bool>();
2850        let include_sha = any::<bool>();
2851        let include_component_id = any::<bool>();
2852        let request_by_id = any::<bool>();
2853        let use_lock = any::<bool>();
2854        let use_sources = any::<bool>();
2855        let bytes = prop::collection::vec(any::<u8>(), 1..64);
2856
2857        (
2858            name,
2859            alt_name,
2860            tag_ref,
2861            bundled,
2862            include_sha,
2863            include_component_id,
2864            request_by_id,
2865            use_lock,
2866            use_sources,
2867            bytes,
2868        )
2869            .prop_map(
2870                |(
2871                    name,
2872                    alt_name,
2873                    tag_ref,
2874                    bundled,
2875                    include_sha,
2876                    include_component_id,
2877                    request_by_id,
2878                    use_lock,
2879                    use_sources,
2880                    bytes,
2881                )| {
2882                    let component_id_str = if include_component_id {
2883                        alt_name.clone()
2884                    } else {
2885                        name.clone()
2886                    };
2887                    let component_id = ComponentId::from_str(&component_id_str).ok();
2888                    let source_ref = if tag_ref {
2889                        format!("oci://registry.test/{name}:v1")
2890                    } else {
2891                        format!(
2892                            "oci://registry.test/{name}@sha256:{}",
2893                            hex::encode([0x11u8; 32])
2894                        )
2895                    };
2896                    let expected_sha256 = if bundled && include_sha {
2897                        Some(compute_sha256_digest_for(&bytes))
2898                    } else {
2899                        None
2900                    };
2901
2902                    let lock_component = PackLockComponent {
2903                        name: name.clone(),
2904                        source_ref: Some(source_ref),
2905                        legacy_ref: None,
2906                        component_id,
2907                        bundled: Some(bundled),
2908                        bundled_path: if bundled {
2909                            Some(format!("components/{name}.wasm"))
2910                        } else {
2911                            None
2912                        },
2913                        legacy_path: None,
2914                        wasm_sha256: expected_sha256.clone(),
2915                        legacy_sha256: None,
2916                        resolved_digest: if bundled {
2917                            None
2918                        } else {
2919                            Some("sha256:deadbeef".to_string())
2920                        },
2921                        digest: None,
2922                    };
2923
2924                    let pack_lock = if use_lock {
2925                        Some(PackLockV1 {
2926                            schema_version: 1,
2927                            components: vec![lock_component],
2928                        })
2929                    } else {
2930                        None
2931                    };
2932
2933                    let component_sources = if use_sources {
2934                        Some(ComponentSourcesV1::new(vec![ComponentSourceEntryV1 {
2935                            name: name.clone(),
2936                            component_id: ComponentId::from_str(&name).ok(),
2937                            source: ComponentSourceRef::from_str(
2938                                "oci://registry.test/component@sha256:deadbeef",
2939                            )
2940                            .expect("component ref"),
2941                            resolved: ResolvedComponentV1 {
2942                                digest: "sha256:deadbeef".to_string(),
2943                                signature: None,
2944                                signed_by: None,
2945                            },
2946                            artifact: if bundled {
2947                                ArtifactLocationV1::Inline {
2948                                    wasm_path: format!("components/{name}.wasm"),
2949                                    manifest_path: None,
2950                                }
2951                            } else {
2952                                ArtifactLocationV1::Remote
2953                            },
2954                            licensing_hint: None,
2955                            metering_hint: None,
2956                        }]))
2957                    } else {
2958                        None
2959                    };
2960
2961                    let request = if request_by_id {
2962                        ResolveRequest::ById(component_id_str.clone())
2963                    } else {
2964                        ResolveRequest::ByName(name.clone())
2965                    };
2966
2967                    Scenario {
2968                        pack_lock,
2969                        component_sources,
2970                        request,
2971                        expected_sha256,
2972                        bytes,
2973                    }
2974                },
2975            )
2976    }
2977
2978    #[test]
2979    fn pack_resolution_proptest() {
2980        let seed = proptest_seed();
2981        run_cases(scenario_strategy(), proptest_config().cases, seed);
2982    }
2983
2984    #[test]
2985    fn pack_resolution_regression_seeds() {
2986        let seeds_path =
2987            Path::new(env!("CARGO_MANIFEST_DIR")).join("../../tests/fixtures/proptest-seeds.txt");
2988        let raw = std::fs::read_to_string(&seeds_path).expect("read proptest seeds");
2989        for line in raw.lines() {
2990            let line = line.trim();
2991            if line.is_empty() || line.starts_with('#') {
2992                continue;
2993            }
2994            let seed = line.parse::<u64>().expect("seed must be an integer");
2995            let mut bytes = [0u8; 32];
2996            bytes[..8].copy_from_slice(&seed.to_le_bytes());
2997            run_cases(scenario_strategy(), 1, Some(bytes));
2998        }
2999    }
3000}
3001
3002fn dist_options_from(component_resolution: &ComponentResolution) -> DistOptions {
3003    let mut opts = DistOptions {
3004        allow_tags: true,
3005        ..DistOptions::default()
3006    };
3007    if let Some(cache_dir) = component_resolution.dist_cache_dir.clone() {
3008        opts.cache_dir = cache_dir;
3009    }
3010    if component_resolution.dist_offline {
3011        opts.offline = true;
3012    }
3013    opts
3014}
3015
3016#[allow(clippy::too_many_arguments)]
3017async fn load_components_from_sources(
3018    cache: &CacheManager,
3019    engine: &Engine,
3020    component_sources: &HashMap<String, ComponentSourceInfo>,
3021    component_resolution: &ComponentResolution,
3022    specs: &[ComponentSpec],
3023    missing: &mut HashSet<String>,
3024    into: &mut HashMap<String, PackComponent>,
3025    materialized_root: Option<&Path>,
3026    archive_hint: Option<&Path>,
3027) -> Result<()> {
3028    let mut archive = if let Some(path) = archive_hint {
3029        Some(
3030            ZipArchive::new(File::open(path)?)
3031                .with_context(|| format!("{} is not a valid gtpack", path.display()))?,
3032        )
3033    } else {
3034        None
3035    };
3036    let mut dist_client: Option<DistClient> = None;
3037
3038    for spec in specs {
3039        if !missing.contains(&spec.id) {
3040            continue;
3041        }
3042        let Some(source) = component_sources.get(&spec.id) else {
3043            continue;
3044        };
3045
3046        let bytes = match &source.artifact {
3047            ComponentArtifactLocation::Inline { wasm_path } => {
3048                if let Some(root) = materialized_root {
3049                    let path = root.join(wasm_path);
3050                    if path.exists() {
3051                        std::fs::read(&path).with_context(|| {
3052                            format!(
3053                                "failed to read inline component {} from {}",
3054                                spec.id,
3055                                path.display()
3056                            )
3057                        })?
3058                    } else if archive.is_none() {
3059                        bail!("inline component {} missing at {}", spec.id, path.display());
3060                    } else {
3061                        read_entry(
3062                            archive.as_mut().expect("archive present when needed"),
3063                            wasm_path,
3064                        )
3065                        .with_context(|| {
3066                            format!(
3067                                "inline component {} missing at {} in pack archive",
3068                                spec.id, wasm_path
3069                            )
3070                        })?
3071                    }
3072                } else if let Some(archive) = archive.as_mut() {
3073                    read_entry(archive, wasm_path).with_context(|| {
3074                        format!(
3075                            "inline component {} missing at {} in pack archive",
3076                            spec.id, wasm_path
3077                        )
3078                    })?
3079                } else {
3080                    bail!(
3081                        "inline component {} missing and no pack source available",
3082                        spec.id
3083                    );
3084                }
3085            }
3086            ComponentArtifactLocation::Remote => {
3087                if source.source.is_tag() {
3088                    bail!(
3089                        "component {} uses tag ref {} but is not bundled; rebuild the pack",
3090                        spec.id,
3091                        source.source
3092                    );
3093                }
3094                let client = dist_client.get_or_insert_with(|| {
3095                    DistClient::new(dist_options_from(component_resolution))
3096                });
3097                let reference = source.source.to_string();
3098                fault::maybe_fail_asset(&reference)
3099                    .await
3100                    .with_context(|| format!("fault injection blocked asset {reference}"))?;
3101                let digest = source.digest.as_deref().ok_or_else(|| {
3102                    anyhow!(
3103                        "component {} missing expected digest for remote component",
3104                        spec.id
3105                    )
3106                })?;
3107                let cache_path = if component_resolution.dist_offline {
3108                    client
3109                        .fetch_digest(digest)
3110                        .await
3111                        .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?
3112                } else {
3113                    let resolved = client
3114                        .resolve_ref(&reference)
3115                        .await
3116                        .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
3117                    let expected = normalize_digest(digest);
3118                    let actual = normalize_digest(&resolved.digest);
3119                    if expected != actual {
3120                        bail!(
3121                            "component {} digest mismatch after fetch: expected {}, got {}",
3122                            spec.id,
3123                            expected,
3124                            actual
3125                        );
3126                    }
3127                    resolved.cache_path.ok_or_else(|| {
3128                        anyhow!(
3129                            "component {} resolved from {} but cache path is missing",
3130                            spec.id,
3131                            reference
3132                        )
3133                    })?
3134                };
3135                std::fs::read(&cache_path).with_context(|| {
3136                    format!(
3137                        "failed to read cached component {} from {}",
3138                        spec.id,
3139                        cache_path.display()
3140                    )
3141                })?
3142            }
3143        };
3144
3145        if let Some(expected) = source.expected_wasm_sha256.as_deref() {
3146            verify_wasm_sha256(&spec.id, expected, &bytes)?;
3147        } else if source.skip_digest_verification {
3148            let actual = compute_sha256_digest_for(&bytes);
3149            warn!(
3150                component_id = %spec.id,
3151                digest = %actual,
3152                "bundled component missing wasm_sha256; allowing due to flag"
3153            );
3154        } else {
3155            let expected = source.digest.as_deref().ok_or_else(|| {
3156                anyhow!(
3157                    "component {} missing expected digest for verification",
3158                    spec.id
3159                )
3160            })?;
3161            verify_component_digest(&spec.id, expected, &bytes)?;
3162        }
3163        let component =
3164            compile_component_with_cache(cache, engine, source.digest.as_deref(), bytes)
3165                .await
3166                .with_context(|| format!("failed to compile component {}", spec.id))?;
3167        into.insert(
3168            spec.id.clone(),
3169            PackComponent {
3170                name: spec.id.clone(),
3171                version: spec.version.clone(),
3172                component,
3173            },
3174        );
3175        missing.remove(&spec.id);
3176    }
3177
3178    Ok(())
3179}
3180
3181fn dist_error_for_component(err: DistError, component_id: &str, reference: &str) -> anyhow::Error {
3182    match err {
3183        DistError::CacheMiss { reference: missing } => anyhow!(
3184            "remote component {} is not cached for {}. Run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3185            component_id,
3186            missing,
3187            reference
3188        ),
3189        DistError::Offline { reference: blocked } => anyhow!(
3190            "offline mode blocked fetching component {} from {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3191            component_id,
3192            blocked,
3193            reference
3194        ),
3195        DistError::AuthRequired { target } => anyhow!(
3196            "component {} requires authenticated source {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3197            component_id,
3198            target,
3199            reference
3200        ),
3201        other => anyhow!(
3202            "failed to resolve component {} from {}: {}",
3203            component_id,
3204            reference,
3205            other
3206        ),
3207    }
3208}
3209
3210async fn load_components_from_overrides(
3211    cache: &CacheManager,
3212    engine: &Engine,
3213    overrides: &HashMap<String, PathBuf>,
3214    specs: &[ComponentSpec],
3215    missing: &mut HashSet<String>,
3216    into: &mut HashMap<String, PackComponent>,
3217) -> Result<()> {
3218    for spec in specs {
3219        if !missing.contains(&spec.id) {
3220            continue;
3221        }
3222        let Some(path) = overrides.get(&spec.id) else {
3223            continue;
3224        };
3225        let bytes = std::fs::read(path)
3226            .with_context(|| format!("failed to read override component {}", path.display()))?;
3227        let component = compile_component_with_cache(cache, engine, None, bytes)
3228            .await
3229            .with_context(|| {
3230                format!(
3231                    "failed to compile component {} from override {}",
3232                    spec.id,
3233                    path.display()
3234                )
3235            })?;
3236        into.insert(
3237            spec.id.clone(),
3238            PackComponent {
3239                name: spec.id.clone(),
3240                version: spec.version.clone(),
3241                component,
3242            },
3243        );
3244        missing.remove(&spec.id);
3245    }
3246    Ok(())
3247}
3248
3249async fn load_components_from_dir(
3250    cache: &CacheManager,
3251    engine: &Engine,
3252    root: &Path,
3253    specs: &[ComponentSpec],
3254    missing: &mut HashSet<String>,
3255    into: &mut HashMap<String, PackComponent>,
3256) -> Result<()> {
3257    for spec in specs {
3258        if !missing.contains(&spec.id) {
3259            continue;
3260        }
3261        let path = component_path_for_spec(root, spec);
3262        if !path.exists() {
3263            tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
3264            continue;
3265        }
3266        let bytes = std::fs::read(&path)
3267            .with_context(|| format!("failed to read component {}", path.display()))?;
3268        let component = compile_component_with_cache(cache, engine, None, bytes)
3269            .await
3270            .with_context(|| {
3271                format!(
3272                    "failed to compile component {} from {}",
3273                    spec.id,
3274                    path.display()
3275                )
3276            })?;
3277        into.insert(
3278            spec.id.clone(),
3279            PackComponent {
3280                name: spec.id.clone(),
3281                version: spec.version.clone(),
3282                component,
3283            },
3284        );
3285        missing.remove(&spec.id);
3286    }
3287    Ok(())
3288}
3289
3290async fn load_components_from_archive(
3291    cache: &CacheManager,
3292    engine: &Engine,
3293    path: &Path,
3294    specs: &[ComponentSpec],
3295    missing: &mut HashSet<String>,
3296    into: &mut HashMap<String, PackComponent>,
3297) -> Result<()> {
3298    let mut archive = ZipArchive::new(File::open(path)?)
3299        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
3300    for spec in specs {
3301        if !missing.contains(&spec.id) {
3302            continue;
3303        }
3304        let file_name = spec
3305            .legacy_path
3306            .clone()
3307            .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
3308        let bytes = match read_entry(&mut archive, &file_name) {
3309            Ok(bytes) => bytes,
3310            Err(err) => {
3311                warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
3312                continue;
3313            }
3314        };
3315        let component = compile_component_with_cache(cache, engine, None, bytes)
3316            .await
3317            .with_context(|| format!("failed to compile component {}", spec.id))?;
3318        into.insert(
3319            spec.id.clone(),
3320            PackComponent {
3321                name: spec.id.clone(),
3322                version: spec.version.clone(),
3323                component,
3324            },
3325        );
3326        missing.remove(&spec.id);
3327    }
3328    Ok(())
3329}
3330
3331#[cfg(test)]
3332mod tests {
3333    use super::*;
3334    use greentic_flow::model::{FlowDoc, NodeDoc};
3335    use indexmap::IndexMap;
3336    use serde_json::json;
3337
3338    #[test]
3339    fn normalizes_raw_component_to_component_exec() {
3340        let mut nodes = IndexMap::new();
3341        let mut raw = IndexMap::new();
3342        raw.insert(
3343            "templating.handlebars".into(),
3344            json!({ "template": "Hi {{name}}" }),
3345        );
3346        nodes.insert(
3347            "start".into(),
3348            NodeDoc {
3349                raw,
3350                routing: json!([{"out": true}]),
3351                ..Default::default()
3352            },
3353        );
3354        let doc = FlowDoc {
3355            id: "welcome".into(),
3356            title: None,
3357            description: None,
3358            flow_type: "messaging".into(),
3359            start: Some("start".into()),
3360            parameters: json!({}),
3361            tags: Vec::new(),
3362            schema_version: None,
3363            entrypoints: IndexMap::new(),
3364            nodes,
3365        };
3366
3367        let normalized = normalize_flow_doc(doc);
3368        let node = normalized.nodes.get("start").expect("node exists");
3369        assert_eq!(node.operation.as_deref(), Some("component.exec"));
3370        assert!(node.raw.is_empty());
3371        let payload = node.payload.as_object().expect("payload object");
3372        assert_eq!(
3373            payload.get("component"),
3374            Some(&Value::String("templating.handlebars".into()))
3375        );
3376        assert_eq!(
3377            payload.get("operation"),
3378            Some(&Value::String("render".into()))
3379        );
3380        let input = payload.get("input").unwrap();
3381        assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
3382    }
3383}
3384
3385#[derive(Clone, Debug, Default, Serialize, Deserialize)]
3386pub struct PackMetadata {
3387    pub pack_id: String,
3388    pub version: String,
3389    #[serde(default)]
3390    pub entry_flows: Vec<String>,
3391    #[serde(default)]
3392    pub secret_requirements: Vec<greentic_types::SecretRequirement>,
3393}
3394
3395impl PackMetadata {
3396    fn from_wasm(bytes: &[u8]) -> Option<Self> {
3397        let parser = Parser::new(0);
3398        for payload in parser.parse_all(bytes) {
3399            let payload = payload.ok()?;
3400            match payload {
3401                Payload::CustomSection(section) => {
3402                    if section.name() == "greentic.manifest"
3403                        && let Ok(meta) = Self::from_bytes(section.data())
3404                    {
3405                        return Some(meta);
3406                    }
3407                }
3408                Payload::DataSection(reader) => {
3409                    for segment in reader.into_iter().flatten() {
3410                        if let Ok(meta) = Self::from_bytes(segment.data) {
3411                            return Some(meta);
3412                        }
3413                    }
3414                }
3415                _ => {}
3416            }
3417        }
3418        None
3419    }
3420
3421    fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
3422        #[derive(Deserialize)]
3423        struct RawManifest {
3424            pack_id: String,
3425            version: String,
3426            #[serde(default)]
3427            entry_flows: Vec<String>,
3428            #[serde(default)]
3429            flows: Vec<RawFlow>,
3430            #[serde(default)]
3431            secret_requirements: Vec<greentic_types::SecretRequirement>,
3432        }
3433
3434        #[derive(Deserialize)]
3435        struct RawFlow {
3436            id: String,
3437        }
3438
3439        let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
3440        let mut entry_flows = if manifest.entry_flows.is_empty() {
3441            manifest.flows.iter().map(|f| f.id.clone()).collect()
3442        } else {
3443            manifest.entry_flows.clone()
3444        };
3445        entry_flows.retain(|id| !id.is_empty());
3446        Ok(Self {
3447            pack_id: manifest.pack_id,
3448            version: manifest.version,
3449            entry_flows,
3450            secret_requirements: manifest.secret_requirements,
3451        })
3452    }
3453
3454    pub fn fallback(path: &Path) -> Self {
3455        let pack_id = path
3456            .file_stem()
3457            .map(|s| s.to_string_lossy().into_owned())
3458            .unwrap_or_else(|| "unknown-pack".to_string());
3459        Self {
3460            pack_id,
3461            version: "0.0.0".to_string(),
3462            entry_flows: Vec::new(),
3463            secret_requirements: Vec::new(),
3464        }
3465    }
3466
3467    pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
3468        let entry_flows = manifest
3469            .flows
3470            .iter()
3471            .map(|flow| flow.id.as_str().to_string())
3472            .collect::<Vec<_>>();
3473        Self {
3474            pack_id: manifest.pack_id.as_str().to_string(),
3475            version: manifest.version.to_string(),
3476            entry_flows,
3477            secret_requirements: manifest.secret_requirements.clone(),
3478        }
3479    }
3480}