greentic_runner_host/
pack.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::cache::{ArtifactKey, CacheConfig, CacheManager, CpuPolicy, EngineProfile};
10use crate::component_api::{
11    self, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::provider::{ProviderBinding, ProviderRegistry};
15use crate::provider_core::SchemaCorePre as ProviderComponentPre;
16use crate::provider_core_only;
17use crate::runtime_wasmtime::{Component, Engine, InstancePre, Linker, ResourceTable};
18use anyhow::{Context, Result, anyhow, bail};
19use greentic_distributor_client::dist::{DistClient, DistError, DistOptions};
20use greentic_interfaces_wasmtime::host_helpers::v1::{
21    self as host_v1, HostFns, add_all_v1_to_linker,
22    runner_host_http::RunnerHostHttp,
23    runner_host_kv::RunnerHostKv,
24    secrets_store::{SecretsError, SecretsStoreHost},
25    state_store::{
26        OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
27        StateStoreHost, TenantCtx as StateTenantCtx,
28    },
29    telemetry_logger::{
30        OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
31        TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
32        TenantCtx as TelemetryTenantCtx,
33    },
34};
35use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
36use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
37use greentic_pack::builder as legacy_pack;
38use greentic_types::flow::FlowHasher;
39use greentic_types::{
40    ArtifactLocationV1, ComponentId, ComponentManifest, ComponentSourceRef, ComponentSourcesV1,
41    EXT_COMPONENT_SOURCES_V1, EnvId, ExtensionRef, Flow, FlowComponentRef, FlowId, FlowKind,
42    FlowMetadata, InputMapping, Node, NodeId, OutputMapping, Routing, StateKey as StoreStateKey,
43    TeamId, TelemetryHints, TenantCtx as TypesTenantCtx, TenantId, UserId, decode_pack_manifest,
44    pack_manifest::ExtensionInline,
45};
46use host_v1::http_client::{
47    HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
48    Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
49    RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
50    TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
51};
52use indexmap::IndexMap;
53use once_cell::sync::Lazy;
54use parking_lot::{Mutex, RwLock};
55use reqwest::blocking::Client as BlockingClient;
56use runner_core::normalize_under_root;
57use serde::{Deserialize, Serialize};
58use serde_cbor;
59use serde_json::{self, Value};
60use sha2::Digest;
61use tokio::fs;
62use wasmparser::{Parser, Payload};
63use wasmtime::StoreContextMut;
64use zip::ZipArchive;
65
66use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
67use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
68use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
69
70use crate::config::HostConfig;
71use crate::fault;
72use crate::secrets::{DynSecretsManager, read_secret_blocking};
73use crate::storage::state::STATE_PREFIX;
74use crate::storage::{DynSessionStore, DynStateStore};
75use crate::verify;
76use crate::wasi::RunnerWasiPolicy;
77use tracing::warn;
78use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
79use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
80
81use greentic_flow::model::FlowDoc;
82
83#[allow(dead_code)]
84pub struct PackRuntime {
85    /// Component artifact path (wasm file).
86    path: PathBuf,
87    /// Optional archive (.gtpack) used to load flows/manifests.
88    archive_path: Option<PathBuf>,
89    config: Arc<HostConfig>,
90    engine: Engine,
91    metadata: PackMetadata,
92    manifest: Option<greentic_types::PackManifest>,
93    legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
94    component_manifests: HashMap<String, ComponentManifest>,
95    mocks: Option<Arc<MockLayer>>,
96    flows: Option<PackFlows>,
97    components: HashMap<String, PackComponent>,
98    http_client: Arc<BlockingClient>,
99    pre_cache: Mutex<HashMap<String, InstancePre<ComponentState>>>,
100    session_store: Option<DynSessionStore>,
101    state_store: Option<DynStateStore>,
102    wasi_policy: Arc<RunnerWasiPolicy>,
103    provider_registry: RwLock<Option<ProviderRegistry>>,
104    secrets: DynSecretsManager,
105    oauth_config: Option<OAuthBrokerConfig>,
106    cache: CacheManager,
107}
108
109struct PackComponent {
110    #[allow(dead_code)]
111    name: String,
112    #[allow(dead_code)]
113    version: String,
114    component: Arc<Component>,
115}
116
117#[derive(Debug, Default, Clone)]
118pub struct ComponentResolution {
119    /// Root of a materialized pack directory containing `manifest.cbor` and `components/`.
120    pub materialized_root: Option<PathBuf>,
121    /// Explicit overrides mapping component id -> wasm path.
122    pub overrides: HashMap<String, PathBuf>,
123    /// If true, do not fetch remote components; require cached artifacts.
124    pub dist_offline: bool,
125    /// Optional cache directory for resolved remote components.
126    pub dist_cache_dir: Option<PathBuf>,
127    /// Allow bundled components without wasm_sha256 (dev-only escape hatch).
128    pub allow_missing_hash: bool,
129}
130
131fn build_blocking_client() -> BlockingClient {
132    std::thread::spawn(|| {
133        BlockingClient::builder()
134            .no_proxy()
135            .build()
136            .expect("blocking client")
137    })
138    .join()
139    .expect("client build thread panicked")
140}
141
142fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
143    let (root, candidate) = if path.is_absolute() {
144        let parent = path
145            .parent()
146            .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
147        let root = parent
148            .canonicalize()
149            .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
150        let file = path
151            .file_name()
152            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
153        (root, PathBuf::from(file))
154    } else {
155        let cwd = std::env::current_dir().context("failed to resolve current directory")?;
156        let base = if let Some(parent) = path.parent() {
157            cwd.join(parent)
158        } else {
159            cwd
160        };
161        let root = base
162            .canonicalize()
163            .with_context(|| format!("failed to canonicalize {}", base.display()))?;
164        let file = path
165            .file_name()
166            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
167        (root, PathBuf::from(file))
168    };
169    let safe = normalize_under_root(&root, &candidate)?;
170    Ok((root, safe))
171}
172
173static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
174
175#[derive(Debug, Clone, Serialize, Deserialize)]
176pub struct FlowDescriptor {
177    pub id: String,
178    #[serde(rename = "type")]
179    pub flow_type: String,
180    pub pack_id: String,
181    pub profile: String,
182    pub version: String,
183    #[serde(default)]
184    pub description: Option<String>,
185}
186
187pub struct HostState {
188    config: Arc<HostConfig>,
189    http_client: Arc<BlockingClient>,
190    default_env: String,
191    #[allow(dead_code)]
192    session_store: Option<DynSessionStore>,
193    state_store: Option<DynStateStore>,
194    mocks: Option<Arc<MockLayer>>,
195    secrets: DynSecretsManager,
196    oauth_config: Option<OAuthBrokerConfig>,
197    oauth_host: OAuthBrokerHost,
198    exec_ctx: Option<ComponentExecCtx>,
199}
200
201impl HostState {
202    #[allow(clippy::default_constructed_unit_structs)]
203    #[allow(clippy::too_many_arguments)]
204    pub fn new(
205        config: Arc<HostConfig>,
206        http_client: Arc<BlockingClient>,
207        mocks: Option<Arc<MockLayer>>,
208        session_store: Option<DynSessionStore>,
209        state_store: Option<DynStateStore>,
210        secrets: DynSecretsManager,
211        oauth_config: Option<OAuthBrokerConfig>,
212        exec_ctx: Option<ComponentExecCtx>,
213    ) -> Result<Self> {
214        let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
215        Ok(Self {
216            config,
217            http_client,
218            default_env,
219            session_store,
220            state_store,
221            mocks,
222            secrets,
223            oauth_config,
224            oauth_host: OAuthBrokerHost::default(),
225            exec_ctx,
226        })
227    }
228
229    pub fn get_secret(&self, key: &str) -> Result<String> {
230        if provider_core_only::is_enabled() {
231            bail!(provider_core_only::blocked_message("secrets"))
232        }
233        if !self.config.secrets_policy.is_allowed(key) {
234            bail!("secret {key} is not permitted by bindings policy");
235        }
236        if let Some(mock) = &self.mocks
237            && let Some(value) = mock.secrets_lookup(key)
238        {
239            return Ok(value);
240        }
241        let ctx = self.config.tenant_ctx();
242        let bytes = read_secret_blocking(&self.secrets, &ctx, key)
243            .context("failed to read secret from manager")?;
244        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
245        Ok(value)
246    }
247
248    fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
249        let tenant_raw = ctx
250            .as_ref()
251            .map(|ctx| ctx.tenant.clone())
252            .or_else(|| self.exec_ctx.as_ref().map(|ctx| ctx.tenant.tenant.clone()))
253            .unwrap_or_else(|| self.config.tenant.clone());
254        let env_raw = ctx
255            .as_ref()
256            .map(|ctx| ctx.env.clone())
257            .unwrap_or_else(|| self.default_env.clone());
258        let tenant_id = TenantId::from_str(&tenant_raw)
259            .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
260        let env_id = EnvId::from_str(&env_raw)
261            .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
262        let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
263        if let Some(exec_ctx) = self.exec_ctx.as_ref() {
264            if let Some(team) = exec_ctx.tenant.team.as_ref() {
265                let team_id =
266                    TeamId::from_str(team).with_context(|| format!("invalid team id `{team}`"))?;
267                tenant_ctx = tenant_ctx.with_team(Some(team_id));
268            }
269            if let Some(user) = exec_ctx.tenant.user.as_ref() {
270                let user_id =
271                    UserId::from_str(user).with_context(|| format!("invalid user id `{user}`"))?;
272                tenant_ctx = tenant_ctx.with_user(Some(user_id));
273            }
274            tenant_ctx = tenant_ctx.with_flow(exec_ctx.flow_id.clone());
275            if let Some(node) = exec_ctx.node_id.as_ref() {
276                tenant_ctx = tenant_ctx.with_node(node.clone());
277            }
278            if let Some(session) = exec_ctx.tenant.correlation_id.as_ref() {
279                tenant_ctx = tenant_ctx.with_session(session.clone());
280            }
281            tenant_ctx.trace_id = exec_ctx.tenant.trace_id.clone();
282        }
283
284        if let Some(ctx) = ctx {
285            if let Some(team) = ctx.team.or(ctx.team_id) {
286                let team_id =
287                    TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
288                tenant_ctx = tenant_ctx.with_team(Some(team_id));
289            }
290            if let Some(user) = ctx.user.or(ctx.user_id) {
291                let user_id =
292                    UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
293                tenant_ctx = tenant_ctx.with_user(Some(user_id));
294            }
295            if let Some(flow) = ctx.flow_id {
296                tenant_ctx = tenant_ctx.with_flow(flow);
297            }
298            if let Some(node) = ctx.node_id {
299                tenant_ctx = tenant_ctx.with_node(node);
300            }
301            if let Some(provider) = ctx.provider_id {
302                tenant_ctx = tenant_ctx.with_provider(provider);
303            }
304            if let Some(session) = ctx.session_id {
305                tenant_ctx = tenant_ctx.with_session(session);
306            }
307            tenant_ctx.trace_id = ctx.trace_id;
308        }
309        Ok(tenant_ctx)
310    }
311
312    fn send_http_request(
313        &mut self,
314        req: HttpRequest,
315        opts: Option<HttpRequestOptionsV1_1>,
316        _ctx: Option<HttpTenantCtx>,
317    ) -> Result<HttpResponse, HttpClientError> {
318        if !self.config.http_enabled {
319            return Err(HttpClientError {
320                code: "denied".into(),
321                message: "http client disabled by policy".into(),
322            });
323        }
324
325        let mut mock_state = None;
326        let raw_body = req.body.clone();
327        if let Some(mock) = &self.mocks
328            && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
329        {
330            match mock.http_begin(&meta) {
331                HttpDecision::Mock(response) => {
332                    let headers = response
333                        .headers
334                        .iter()
335                        .map(|(k, v)| (k.clone(), v.clone()))
336                        .collect();
337                    return Ok(HttpResponse {
338                        status: response.status,
339                        headers,
340                        body: response.body.clone().map(|b| b.into_bytes()),
341                    });
342                }
343                HttpDecision::Deny(reason) => {
344                    return Err(HttpClientError {
345                        code: "denied".into(),
346                        message: reason,
347                    });
348                }
349                HttpDecision::Passthrough { record } => {
350                    mock_state = Some((meta, record));
351                }
352            }
353        }
354
355        let method = req.method.parse().unwrap_or(reqwest::Method::GET);
356        let mut builder = self.http_client.request(method, &req.url);
357        for (key, value) in req.headers {
358            if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
359                && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
360            {
361                builder = builder.header(header, header_value);
362            }
363        }
364
365        if let Some(body) = raw_body.clone() {
366            builder = builder.body(body);
367        }
368
369        if let Some(opts) = opts {
370            if let Some(timeout_ms) = opts.timeout_ms {
371                builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
372            }
373            if opts.allow_insecure == Some(true) {
374                warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
375            }
376            if let Some(follow_redirects) = opts.follow_redirects
377                && !follow_redirects
378            {
379                warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
380            }
381        }
382
383        let response = match builder.send() {
384            Ok(resp) => resp,
385            Err(err) => {
386                warn!(url = %req.url, error = %err, "http client request failed");
387                return Err(HttpClientError {
388                    code: "unavailable".into(),
389                    message: err.to_string(),
390                });
391            }
392        };
393
394        let status = response.status().as_u16();
395        let headers_vec = response
396            .headers()
397            .iter()
398            .map(|(k, v)| {
399                (
400                    k.as_str().to_string(),
401                    v.to_str().unwrap_or_default().to_string(),
402                )
403            })
404            .collect::<Vec<_>>();
405        let body_bytes = response.bytes().ok().map(|b| b.to_vec());
406
407        if let Some((meta, true)) = mock_state.take()
408            && let Some(mock) = &self.mocks
409        {
410            let recorded = HttpMockResponse::new(
411                status,
412                headers_vec.clone().into_iter().collect(),
413                body_bytes
414                    .as_ref()
415                    .map(|b| String::from_utf8_lossy(b).into_owned()),
416            );
417            mock.http_record(&meta, &recorded);
418        }
419
420        Ok(HttpResponse {
421            status,
422            headers: headers_vec,
423            body: body_bytes,
424        })
425    }
426}
427
428impl SecretsStoreHost for HostState {
429    fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
430        if provider_core_only::is_enabled() {
431            warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
432            return Err(SecretsError::Denied);
433        }
434        if !self.config.secrets_policy.is_allowed(&key) {
435            return Err(SecretsError::Denied);
436        }
437        if let Some(mock) = &self.mocks
438            && let Some(value) = mock.secrets_lookup(&key)
439        {
440            return Ok(Some(value.into_bytes()));
441        }
442        let ctx = self.config.tenant_ctx();
443        match read_secret_blocking(&self.secrets, &ctx, &key) {
444            Ok(bytes) => Ok(Some(bytes)),
445            Err(err) => {
446                warn!(secret = %key, error = %err, "secret lookup failed");
447                Err(SecretsError::NotFound)
448            }
449        }
450    }
451}
452
453impl HttpClientHost for HostState {
454    fn send(
455        &mut self,
456        req: HttpRequest,
457        ctx: Option<HttpTenantCtx>,
458    ) -> Result<HttpResponse, HttpClientError> {
459        self.send_http_request(req, None, ctx)
460    }
461}
462
463impl HttpClientHostV1_1 for HostState {
464    fn send(
465        &mut self,
466        req: HttpRequestV1_1,
467        opts: Option<HttpRequestOptionsV1_1>,
468        ctx: Option<HttpTenantCtxV1_1>,
469    ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
470        let legacy_req = HttpRequest {
471            method: req.method,
472            url: req.url,
473            headers: req.headers,
474            body: req.body,
475        };
476        let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
477            env: ctx.env,
478            tenant: ctx.tenant,
479            tenant_id: ctx.tenant_id,
480            team: ctx.team,
481            team_id: ctx.team_id,
482            user: ctx.user,
483            user_id: ctx.user_id,
484            trace_id: ctx.trace_id,
485            correlation_id: ctx.correlation_id,
486            attributes: ctx.attributes,
487            session_id: ctx.session_id,
488            flow_id: ctx.flow_id,
489            node_id: ctx.node_id,
490            provider_id: ctx.provider_id,
491            deadline_ms: ctx.deadline_ms,
492            attempt: ctx.attempt,
493            idempotency_key: ctx.idempotency_key,
494            impersonation: ctx
495                .impersonation
496                .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
497                    actor_id,
498                    reason,
499                }),
500        });
501
502        self.send_http_request(legacy_req, opts, legacy_ctx)
503            .map(|resp| HttpResponseV1_1 {
504                status: resp.status,
505                headers: resp.headers,
506                body: resp.body,
507            })
508            .map_err(|err| HttpClientErrorV1_1 {
509                code: err.code,
510                message: err.message,
511            })
512    }
513}
514
515impl StateStoreHost for HostState {
516    fn read(
517        &mut self,
518        key: HostStateKey,
519        ctx: Option<StateTenantCtx>,
520    ) -> Result<Vec<u8>, StateError> {
521        let store = match self.state_store.as_ref() {
522            Some(store) => store.clone(),
523            None => {
524                return Err(StateError {
525                    code: "unavailable".into(),
526                    message: "state store not configured".into(),
527                });
528            }
529        };
530        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
531            Ok(ctx) => ctx,
532            Err(err) => {
533                return Err(StateError {
534                    code: "invalid-ctx".into(),
535                    message: err.to_string(),
536                });
537            }
538        };
539        let key = StoreStateKey::from(key);
540        match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
541            Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
542            Ok(None) => Err(StateError {
543                code: "not_found".into(),
544                message: "state key not found".into(),
545            }),
546            Err(err) => Err(StateError {
547                code: "internal".into(),
548                message: err.to_string(),
549            }),
550        }
551    }
552
553    fn write(
554        &mut self,
555        key: HostStateKey,
556        bytes: Vec<u8>,
557        ctx: Option<StateTenantCtx>,
558    ) -> Result<StateOpAck, StateError> {
559        let store = match self.state_store.as_ref() {
560            Some(store) => store.clone(),
561            None => {
562                return Err(StateError {
563                    code: "unavailable".into(),
564                    message: "state store not configured".into(),
565                });
566            }
567        };
568        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
569            Ok(ctx) => ctx,
570            Err(err) => {
571                return Err(StateError {
572                    code: "invalid-ctx".into(),
573                    message: err.to_string(),
574                });
575            }
576        };
577        let key = StoreStateKey::from(key);
578        let value = serde_json::from_slice(&bytes)
579            .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
580        match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
581            Ok(()) => Ok(StateOpAck::Ok),
582            Err(err) => Err(StateError {
583                code: "internal".into(),
584                message: err.to_string(),
585            }),
586        }
587    }
588
589    fn delete(
590        &mut self,
591        key: HostStateKey,
592        ctx: Option<StateTenantCtx>,
593    ) -> Result<StateOpAck, StateError> {
594        let store = match self.state_store.as_ref() {
595            Some(store) => store.clone(),
596            None => {
597                return Err(StateError {
598                    code: "unavailable".into(),
599                    message: "state store not configured".into(),
600                });
601            }
602        };
603        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
604            Ok(ctx) => ctx,
605            Err(err) => {
606                return Err(StateError {
607                    code: "invalid-ctx".into(),
608                    message: err.to_string(),
609                });
610            }
611        };
612        let key = StoreStateKey::from(key);
613        match store.del(&tenant_ctx, STATE_PREFIX, &key) {
614            Ok(_) => Ok(StateOpAck::Ok),
615            Err(err) => Err(StateError {
616                code: "internal".into(),
617                message: err.to_string(),
618            }),
619        }
620    }
621}
622
623impl TelemetryLoggerHost for HostState {
624    fn log(
625        &mut self,
626        span: TelemetrySpanContext,
627        fields: Vec<(String, String)>,
628        _ctx: Option<TelemetryTenantCtx>,
629    ) -> Result<TelemetryAck, TelemetryError> {
630        if let Some(mock) = &self.mocks
631            && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
632        {
633            return Ok(TelemetryAck::Ok);
634        }
635        let mut map = serde_json::Map::new();
636        for (k, v) in fields {
637            map.insert(k, Value::String(v));
638        }
639        tracing::info!(
640            tenant = %span.tenant,
641            flow_id = %span.flow_id,
642            node = ?span.node_id,
643            provider = %span.provider,
644            fields = %serde_json::Value::Object(map.clone()),
645            "telemetry log from pack"
646        );
647        Ok(TelemetryAck::Ok)
648    }
649}
650
651impl RunnerHostHttp for HostState {
652    fn request(
653        &mut self,
654        method: String,
655        url: String,
656        headers: Vec<String>,
657        body: Option<Vec<u8>>,
658    ) -> Result<Vec<u8>, String> {
659        let req = HttpRequest {
660            method,
661            url,
662            headers: headers
663                .chunks(2)
664                .filter_map(|chunk| {
665                    if chunk.len() == 2 {
666                        Some((chunk[0].clone(), chunk[1].clone()))
667                    } else {
668                        None
669                    }
670                })
671                .collect(),
672            body,
673        };
674        match HttpClientHost::send(self, req, None) {
675            Ok(resp) => Ok(resp.body.unwrap_or_default()),
676            Err(err) => Err(err.message),
677        }
678    }
679}
680
681impl RunnerHostKv for HostState {
682    fn get(&mut self, _ns: String, _key: String) -> Option<String> {
683        None
684    }
685
686    fn put(&mut self, _ns: String, _key: String, _val: String) {}
687}
688
689enum ManifestLoad {
690    New {
691        manifest: Box<greentic_types::PackManifest>,
692        flows: PackFlows,
693    },
694    Legacy {
695        manifest: Box<legacy_pack::PackManifest>,
696        flows: PackFlows,
697    },
698}
699
700fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
701    let mut archive = ZipArchive::new(File::open(path)?)
702        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
703    let bytes = read_entry(&mut archive, "manifest.cbor")
704        .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
705    match decode_pack_manifest(&bytes) {
706        Ok(manifest) => {
707            let cache = PackFlows::from_manifest(manifest.clone());
708            Ok(ManifestLoad::New {
709                manifest: Box::new(manifest),
710                flows: cache,
711            })
712        }
713        Err(err) => {
714            tracing::debug!(error = %err, pack = %path.display(), "decode_pack_manifest failed; trying legacy manifest");
715            // Fall back to legacy pack manifest
716            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
717                .context("failed to decode legacy pack manifest from manifest.cbor")?;
718            let flows = load_legacy_flows(&mut archive, &legacy)?;
719            Ok(ManifestLoad::Legacy {
720                manifest: Box::new(legacy),
721                flows,
722            })
723        }
724    }
725}
726
727fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
728    let manifest_path = root.join("manifest.cbor");
729    let bytes = std::fs::read(&manifest_path)
730        .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
731    match decode_pack_manifest(&bytes) {
732        Ok(manifest) => {
733            let cache = PackFlows::from_manifest(manifest.clone());
734            Ok(ManifestLoad::New {
735                manifest: Box::new(manifest),
736                flows: cache,
737            })
738        }
739        Err(err) => {
740            tracing::debug!(
741                error = %err,
742                pack = %root.display(),
743                "decode_pack_manifest failed for materialized pack; trying legacy manifest"
744            );
745            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
746                .context("failed to decode legacy pack manifest from manifest.cbor")?;
747            let flows = load_legacy_flows_from_dir(root, &legacy)?;
748            Ok(ManifestLoad::Legacy {
749                manifest: Box::new(legacy),
750                flows,
751            })
752        }
753    }
754}
755
756fn load_legacy_flows(
757    archive: &mut ZipArchive<File>,
758    manifest: &legacy_pack::PackManifest,
759) -> Result<PackFlows> {
760    let mut flows = HashMap::new();
761    let mut descriptors = Vec::new();
762
763    for entry in &manifest.flows {
764        let bytes = read_entry(archive, &entry.file_json)
765            .with_context(|| format!("missing flow json {}", entry.file_json))?;
766        let doc: FlowDoc = serde_json::from_slice(&bytes)
767            .with_context(|| format!("failed to decode flow doc {}", entry.file_json))?;
768        let normalized = normalize_flow_doc(doc);
769        let flow_ir = flow_doc_to_ir(normalized)?;
770        let flow = flow_ir_to_flow(flow_ir)?;
771
772        descriptors.push(FlowDescriptor {
773            id: entry.id.clone(),
774            flow_type: entry.kind.clone(),
775            pack_id: manifest.meta.pack_id.clone(),
776            profile: manifest.meta.pack_id.clone(),
777            version: manifest.meta.version.to_string(),
778            description: None,
779        });
780        flows.insert(entry.id.clone(), flow);
781    }
782
783    let mut entry_flows = manifest.meta.entry_flows.clone();
784    if entry_flows.is_empty() {
785        entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
786    }
787    let metadata = PackMetadata {
788        pack_id: manifest.meta.pack_id.clone(),
789        version: manifest.meta.version.to_string(),
790        entry_flows,
791        secret_requirements: Vec::new(),
792    };
793
794    Ok(PackFlows {
795        descriptors,
796        flows,
797        metadata,
798    })
799}
800
801fn load_legacy_flows_from_dir(
802    root: &Path,
803    manifest: &legacy_pack::PackManifest,
804) -> Result<PackFlows> {
805    let mut flows = HashMap::new();
806    let mut descriptors = Vec::new();
807
808    for entry in &manifest.flows {
809        let path = root.join(&entry.file_json);
810        let bytes = std::fs::read(&path)
811            .with_context(|| format!("missing flow json {}", path.display()))?;
812        let doc: FlowDoc = serde_json::from_slice(&bytes)
813            .with_context(|| format!("failed to decode flow doc {}", path.display()))?;
814        let normalized = normalize_flow_doc(doc);
815        let flow_ir = flow_doc_to_ir(normalized)?;
816        let flow = flow_ir_to_flow(flow_ir)?;
817
818        descriptors.push(FlowDescriptor {
819            id: entry.id.clone(),
820            flow_type: entry.kind.clone(),
821            pack_id: manifest.meta.pack_id.clone(),
822            profile: manifest.meta.pack_id.clone(),
823            version: manifest.meta.version.to_string(),
824            description: None,
825        });
826        flows.insert(entry.id.clone(), flow);
827    }
828
829    let mut entry_flows = manifest.meta.entry_flows.clone();
830    if entry_flows.is_empty() {
831        entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
832    }
833    let metadata = PackMetadata {
834        pack_id: manifest.meta.pack_id.clone(),
835        version: manifest.meta.version.to_string(),
836        entry_flows,
837        secret_requirements: Vec::new(),
838    };
839
840    Ok(PackFlows {
841        descriptors,
842        flows,
843        metadata,
844    })
845}
846
847pub struct ComponentState {
848    pub host: HostState,
849    wasi_ctx: WasiCtx,
850    resource_table: ResourceTable,
851}
852
853impl ComponentState {
854    pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
855        let wasi_ctx = policy
856            .instantiate()
857            .context("failed to build WASI context")?;
858        Ok(Self {
859            host,
860            wasi_ctx,
861            resource_table: ResourceTable::new(),
862        })
863    }
864
865    fn host_mut(&mut self) -> &mut HostState {
866        &mut self.host
867    }
868
869    fn should_cancel_host(&mut self) -> bool {
870        false
871    }
872
873    fn yield_now_host(&mut self) {
874        // no-op cooperative yield
875    }
876}
877
878impl component_api::v0_4::greentic::component::control::Host for ComponentState {
879    fn should_cancel(&mut self) -> bool {
880        self.should_cancel_host()
881    }
882
883    fn yield_now(&mut self) {
884        self.yield_now_host();
885    }
886}
887
888impl component_api::v0_5::greentic::component::control::Host for ComponentState {
889    fn should_cancel(&mut self) -> bool {
890        self.should_cancel_host()
891    }
892
893    fn yield_now(&mut self) {
894        self.yield_now_host();
895    }
896}
897
898fn add_component_control_instance(
899    linker: &mut Linker<ComponentState>,
900    name: &str,
901) -> wasmtime::Result<()> {
902    let mut inst = linker.instance(name)?;
903    inst.func_wrap(
904        "should-cancel",
905        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
906            let host = caller.data_mut();
907            Ok((host.should_cancel_host(),))
908        },
909    )?;
910    inst.func_wrap(
911        "yield-now",
912        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
913            let host = caller.data_mut();
914            host.yield_now_host();
915            Ok(())
916        },
917    )?;
918    Ok(())
919}
920
921fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
922    add_component_control_instance(linker, "greentic:component/control@0.5.0")?;
923    add_component_control_instance(linker, "greentic:component/control@0.4.0")?;
924    Ok(())
925}
926
927pub fn register_all(linker: &mut Linker<ComponentState>, allow_state_store: bool) -> Result<()> {
928    add_wasi_to_linker(linker)?;
929    add_all_v1_to_linker(
930        linker,
931        HostFns {
932            http_client_v1_1: Some(|state| state.host_mut()),
933            http_client: Some(|state| state.host_mut()),
934            oauth_broker: None,
935            runner_host_http: Some(|state| state.host_mut()),
936            runner_host_kv: Some(|state| state.host_mut()),
937            telemetry_logger: Some(|state| state.host_mut()),
938            state_store: allow_state_store.then_some(|state| state.host_mut()),
939            secrets_store: Some(|state| state.host_mut()),
940        },
941    )?;
942    Ok(())
943}
944
945impl OAuthHostContext for ComponentState {
946    fn tenant_id(&self) -> &str {
947        &self.host.config.tenant
948    }
949
950    fn env(&self) -> &str {
951        &self.host.default_env
952    }
953
954    fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
955        &mut self.host.oauth_host
956    }
957
958    fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
959        self.host.oauth_config.as_ref()
960    }
961}
962
963impl WasiView for ComponentState {
964    fn ctx(&mut self) -> WasiCtxView<'_> {
965        WasiCtxView {
966            ctx: &mut self.wasi_ctx,
967            table: &mut self.resource_table,
968        }
969    }
970}
971
972#[allow(unsafe_code)]
973unsafe impl Send for ComponentState {}
974#[allow(unsafe_code)]
975unsafe impl Sync for ComponentState {}
976
977impl PackRuntime {
978    fn allows_state_store(&self, component_ref: &str) -> bool {
979        if self.state_store.is_none() {
980            return false;
981        }
982        if !self.config.state_store_policy.allow {
983            return false;
984        }
985        let Some(manifest) = self.component_manifests.get(component_ref) else {
986            return false;
987        };
988        manifest
989            .capabilities
990            .host
991            .state
992            .as_ref()
993            .map(|caps| caps.read || caps.write)
994            .unwrap_or(false)
995    }
996
997    #[allow(clippy::too_many_arguments)]
998    pub async fn load(
999        path: impl AsRef<Path>,
1000        config: Arc<HostConfig>,
1001        mocks: Option<Arc<MockLayer>>,
1002        archive_source: Option<&Path>,
1003        session_store: Option<DynSessionStore>,
1004        state_store: Option<DynStateStore>,
1005        wasi_policy: Arc<RunnerWasiPolicy>,
1006        secrets: DynSecretsManager,
1007        oauth_config: Option<OAuthBrokerConfig>,
1008        verify_archive: bool,
1009        component_resolution: ComponentResolution,
1010    ) -> Result<Self> {
1011        let path = path.as_ref();
1012        let (_pack_root, safe_path) = normalize_pack_path(path)?;
1013        let path_meta = std::fs::metadata(&safe_path).ok();
1014        let is_dir = path_meta
1015            .as_ref()
1016            .map(|meta| meta.is_dir())
1017            .unwrap_or(false);
1018        let is_component = !is_dir
1019            && safe_path
1020                .extension()
1021                .and_then(|ext| ext.to_str())
1022                .map(|ext| ext.eq_ignore_ascii_case("wasm"))
1023                .unwrap_or(false);
1024        let archive_hint_path = if let Some(source) = archive_source {
1025            let (_, normalized) = normalize_pack_path(source)?;
1026            Some(normalized)
1027        } else if is_component || is_dir {
1028            None
1029        } else {
1030            Some(safe_path.clone())
1031        };
1032        let archive_hint = archive_hint_path.as_deref();
1033        if verify_archive {
1034            if let Some(verify_target) = archive_hint.and_then(|p| {
1035                std::fs::metadata(p)
1036                    .ok()
1037                    .filter(|meta| meta.is_file())
1038                    .map(|_| p)
1039            }) {
1040                verify::verify_pack(verify_target).await?;
1041                tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
1042            } else {
1043                tracing::debug!("skipping archive verification (no archive source)");
1044            }
1045        }
1046        let engine = Engine::default();
1047        let engine_profile =
1048            EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1049        let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1050        let mut metadata = PackMetadata::fallback(&safe_path);
1051        let mut manifest = None;
1052        let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
1053        let mut flows = None;
1054        let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
1055            if is_dir {
1056                Some(safe_path.clone())
1057            } else {
1058                None
1059            }
1060        });
1061
1062        if let Some(root) = materialized_root.as_ref() {
1063            match load_manifest_and_flows_from_dir(root) {
1064                Ok(ManifestLoad::New {
1065                    manifest: m,
1066                    flows: cache,
1067                }) => {
1068                    metadata = cache.metadata.clone();
1069                    manifest = Some(*m);
1070                    flows = Some(cache);
1071                }
1072                Ok(ManifestLoad::Legacy {
1073                    manifest: m,
1074                    flows: cache,
1075                }) => {
1076                    metadata = cache.metadata.clone();
1077                    legacy_manifest = Some(m);
1078                    flows = Some(cache);
1079                }
1080                Err(err) => {
1081                    warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
1082                }
1083            }
1084        }
1085
1086        if manifest.is_none()
1087            && legacy_manifest.is_none()
1088            && let Some(archive_path) = archive_hint
1089        {
1090            match load_manifest_and_flows(archive_path) {
1091                Ok(ManifestLoad::New {
1092                    manifest: m,
1093                    flows: cache,
1094                }) => {
1095                    metadata = cache.metadata.clone();
1096                    manifest = Some(*m);
1097                    flows = Some(cache);
1098                }
1099                Ok(ManifestLoad::Legacy {
1100                    manifest: m,
1101                    flows: cache,
1102                }) => {
1103                    metadata = cache.metadata.clone();
1104                    legacy_manifest = Some(m);
1105                    flows = Some(cache);
1106                }
1107                Err(err) => {
1108                    warn!(error = %err, pack = %archive_path.display(), "failed to parse pack manifest; skipping flows");
1109                }
1110            }
1111        }
1112        let mut pack_lock = None;
1113        for root in find_pack_lock_roots(&safe_path, is_dir, archive_hint) {
1114            pack_lock = load_pack_lock(&root)?;
1115            if pack_lock.is_some() {
1116                break;
1117            }
1118        }
1119        let component_sources_payload = if pack_lock.is_none() {
1120            if let Some(manifest) = manifest.as_ref() {
1121                manifest
1122                    .get_component_sources_v1()
1123                    .context("invalid component sources extension")?
1124            } else {
1125                None
1126            }
1127        } else {
1128            None
1129        };
1130        let component_sources = if let Some(lock) = pack_lock.as_ref() {
1131            Some(component_sources_table_from_pack_lock(
1132                lock,
1133                component_resolution.allow_missing_hash,
1134            )?)
1135        } else {
1136            component_sources_table(component_sources_payload.as_ref())?
1137        };
1138        let components = if is_component {
1139            let wasm_bytes = fs::read(&safe_path).await?;
1140            metadata = PackMetadata::from_wasm(&wasm_bytes)
1141                .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1142            let name = safe_path
1143                .file_stem()
1144                .map(|s| s.to_string_lossy().to_string())
1145                .unwrap_or_else(|| "component".to_string());
1146            let component = compile_component_with_cache(&cache, &engine, None, wasm_bytes).await?;
1147            let mut map = HashMap::new();
1148            map.insert(
1149                name.clone(),
1150                PackComponent {
1151                    name,
1152                    version: metadata.version.clone(),
1153                    component,
1154                },
1155            );
1156            map
1157        } else {
1158            let specs = component_specs(
1159                manifest.as_ref(),
1160                legacy_manifest.as_deref(),
1161                component_sources_payload.as_ref(),
1162                pack_lock.as_ref(),
1163            );
1164            if specs.is_empty() {
1165                HashMap::new()
1166            } else {
1167                let mut loaded = HashMap::new();
1168                let mut missing: HashSet<String> =
1169                    specs.iter().map(|spec| spec.id.clone()).collect();
1170                let mut searched = Vec::new();
1171
1172                if !component_resolution.overrides.is_empty() {
1173                    load_components_from_overrides(
1174                        &cache,
1175                        &engine,
1176                        &component_resolution.overrides,
1177                        &specs,
1178                        &mut missing,
1179                        &mut loaded,
1180                    )
1181                    .await?;
1182                    searched.push("override map".to_string());
1183                }
1184
1185                if let Some(component_sources) = component_sources.as_ref() {
1186                    load_components_from_sources(
1187                        &cache,
1188                        &engine,
1189                        component_sources,
1190                        &component_resolution,
1191                        &specs,
1192                        &mut missing,
1193                        &mut loaded,
1194                        materialized_root.as_deref(),
1195                        archive_hint,
1196                    )
1197                    .await?;
1198                    searched.push(format!("extension {}", EXT_COMPONENT_SOURCES_V1));
1199                }
1200
1201                if let Some(root) = materialized_root.as_ref() {
1202                    load_components_from_dir(
1203                        &cache,
1204                        &engine,
1205                        root,
1206                        &specs,
1207                        &mut missing,
1208                        &mut loaded,
1209                    )
1210                    .await?;
1211                    searched.push(format!("components dir {}", root.display()));
1212                }
1213
1214                if let Some(archive_path) = archive_hint {
1215                    load_components_from_archive(
1216                        &cache,
1217                        &engine,
1218                        archive_path,
1219                        &specs,
1220                        &mut missing,
1221                        &mut loaded,
1222                    )
1223                    .await?;
1224                    searched.push(format!("archive {}", archive_path.display()));
1225                }
1226
1227                if !missing.is_empty() {
1228                    let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
1229                    let sources = if searched.is_empty() {
1230                        "no component sources".to_string()
1231                    } else {
1232                        searched.join(", ")
1233                    };
1234                    bail!(
1235                        "components missing: {}; looked in {}",
1236                        missing_list,
1237                        sources
1238                    );
1239                }
1240
1241                loaded
1242            }
1243        };
1244        let http_client = Arc::clone(&HTTP_CLIENT);
1245        let mut component_manifests = HashMap::new();
1246        if let Some(manifest) = manifest.as_ref() {
1247            for component in &manifest.components {
1248                component_manifests.insert(component.id.as_str().to_string(), component.clone());
1249            }
1250        }
1251        Ok(Self {
1252            path: safe_path,
1253            archive_path: archive_hint.map(Path::to_path_buf),
1254            config,
1255            engine,
1256            metadata,
1257            manifest,
1258            legacy_manifest,
1259            component_manifests,
1260            mocks,
1261            flows,
1262            components,
1263            http_client,
1264            pre_cache: Mutex::new(HashMap::new()),
1265            session_store,
1266            state_store,
1267            wasi_policy,
1268            provider_registry: RwLock::new(None),
1269            secrets,
1270            oauth_config,
1271            cache,
1272        })
1273    }
1274
1275    pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
1276        if let Some(cache) = &self.flows {
1277            return Ok(cache.descriptors.clone());
1278        }
1279        if let Some(manifest) = &self.manifest {
1280            let descriptors = manifest
1281                .flows
1282                .iter()
1283                .map(|flow| FlowDescriptor {
1284                    id: flow.id.as_str().to_string(),
1285                    flow_type: flow_kind_to_str(flow.kind).to_string(),
1286                    pack_id: manifest.pack_id.as_str().to_string(),
1287                    profile: manifest.pack_id.as_str().to_string(),
1288                    version: manifest.version.to_string(),
1289                    description: None,
1290                })
1291                .collect();
1292            return Ok(descriptors);
1293        }
1294        Ok(Vec::new())
1295    }
1296
1297    #[allow(dead_code)]
1298    pub async fn run_flow(
1299        &self,
1300        flow_id: &str,
1301        input: serde_json::Value,
1302    ) -> Result<serde_json::Value> {
1303        let pack = Arc::new(
1304            PackRuntime::load(
1305                &self.path,
1306                Arc::clone(&self.config),
1307                self.mocks.clone(),
1308                self.archive_path.as_deref(),
1309                self.session_store.clone(),
1310                self.state_store.clone(),
1311                Arc::clone(&self.wasi_policy),
1312                self.secrets.clone(),
1313                self.oauth_config.clone(),
1314                false,
1315                ComponentResolution::default(),
1316            )
1317            .await?,
1318        );
1319
1320        let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1321        let retry_config = self.config.retry_config().into();
1322        let mocks = pack.mocks.as_deref();
1323        let tenant = self.config.tenant.as_str();
1324
1325        let ctx = FlowContext {
1326            tenant,
1327            pack_id: pack.metadata().pack_id.as_str(),
1328            flow_id,
1329            node_id: None,
1330            tool: None,
1331            action: None,
1332            session_id: None,
1333            provider_id: None,
1334            retry_config,
1335            observer: None,
1336            mocks,
1337        };
1338
1339        let execution = engine.execute(ctx, input).await?;
1340        match execution.status {
1341            FlowStatus::Completed => Ok(execution.output),
1342            FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1343                "status": "pending",
1344                "reason": wait.reason,
1345                "resume": wait.snapshot,
1346                "response": execution.output,
1347            })),
1348        }
1349    }
1350
1351    pub async fn invoke_component(
1352        &self,
1353        component_ref: &str,
1354        ctx: ComponentExecCtx,
1355        operation: &str,
1356        _config_json: Option<String>,
1357        input_json: String,
1358    ) -> Result<Value> {
1359        let pack_component = self
1360            .components
1361            .get(component_ref)
1362            .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1363
1364        let mut linker = Linker::new(&self.engine);
1365        let allow_state_store = self.allows_state_store(component_ref);
1366        register_all(&mut linker, allow_state_store)?;
1367        add_component_control_to_linker(&mut linker)?;
1368
1369        let host_state = HostState::new(
1370            Arc::clone(&self.config),
1371            Arc::clone(&self.http_client),
1372            self.mocks.clone(),
1373            self.session_store.clone(),
1374            self.state_store.clone(),
1375            Arc::clone(&self.secrets),
1376            self.oauth_config.clone(),
1377            Some(ctx.clone()),
1378        )?;
1379        let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1380        let mut store = wasmtime::Store::new(&self.engine, store_state);
1381        let pre_instance = linker.instantiate_pre(pack_component.component.as_ref())?;
1382        let result = match component_api::v0_5::ComponentPre::new(pre_instance) {
1383            Ok(pre) => {
1384                let bindings = pre.instantiate_async(&mut store).await?;
1385                let node = bindings.greentic_component_node();
1386                let ctx_v05 = component_api::exec_ctx_v0_5(&ctx);
1387                let result = node.call_invoke(&mut store, &ctx_v05, operation, &input_json)?;
1388                component_api::invoke_result_from_v0_5(result)
1389            }
1390            Err(err) => {
1391                if is_missing_node_export(&err, "0.5.0") {
1392                    let pre_instance = linker.instantiate_pre(pack_component.component.as_ref())?;
1393                    let pre: component_api::v0_4::ComponentPre<ComponentState> =
1394                        component_api::v0_4::ComponentPre::new(pre_instance)?;
1395                    let bindings = pre.instantiate_async(&mut store).await?;
1396                    let node = bindings.greentic_component_node();
1397                    let ctx_v04 = component_api::exec_ctx_v0_4(&ctx);
1398                    let result = node.call_invoke(&mut store, &ctx_v04, operation, &input_json)?;
1399                    component_api::invoke_result_from_v0_4(result)
1400                } else {
1401                    return Err(err);
1402                }
1403            }
1404        };
1405
1406        match result {
1407            InvokeResult::Ok(body) => {
1408                if body.is_empty() {
1409                    return Ok(Value::Null);
1410                }
1411                serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
1412            }
1413            InvokeResult::Err(NodeError {
1414                code,
1415                message,
1416                retryable,
1417                backoff_ms,
1418                details,
1419            }) => {
1420                let mut obj = serde_json::Map::new();
1421                obj.insert("ok".into(), Value::Bool(false));
1422                let mut error = serde_json::Map::new();
1423                error.insert("code".into(), Value::String(code));
1424                error.insert("message".into(), Value::String(message));
1425                error.insert("retryable".into(), Value::Bool(retryable));
1426                if let Some(backoff) = backoff_ms {
1427                    error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1428                }
1429                if let Some(details) = details {
1430                    error.insert(
1431                        "details".into(),
1432                        serde_json::from_str(&details).unwrap_or(Value::String(details)),
1433                    );
1434                }
1435                obj.insert("error".into(), Value::Object(error));
1436                Ok(Value::Object(obj))
1437            }
1438        }
1439    }
1440
1441    pub fn resolve_provider(
1442        &self,
1443        provider_id: Option<&str>,
1444        provider_type: Option<&str>,
1445    ) -> Result<ProviderBinding> {
1446        let registry = self.provider_registry()?;
1447        registry.resolve(provider_id, provider_type)
1448    }
1449
1450    pub async fn invoke_provider(
1451        &self,
1452        binding: &ProviderBinding,
1453        ctx: ComponentExecCtx,
1454        op: &str,
1455        input_json: Vec<u8>,
1456    ) -> Result<Value> {
1457        let component_ref = &binding.component_ref;
1458        let pack_component = self
1459            .components
1460            .get(component_ref)
1461            .with_context(|| format!("provider component '{component_ref}' not found in pack"))?;
1462
1463        let mut linker = Linker::new(&self.engine);
1464        let allow_state_store = self.allows_state_store(component_ref);
1465        register_all(&mut linker, allow_state_store)?;
1466        add_component_control_to_linker(&mut linker)?;
1467        let pre_instance = linker.instantiate_pre(pack_component.component.as_ref())?;
1468        let pre: ProviderComponentPre<ComponentState> = ProviderComponentPre::new(pre_instance)?;
1469
1470        let host_state = HostState::new(
1471            Arc::clone(&self.config),
1472            Arc::clone(&self.http_client),
1473            self.mocks.clone(),
1474            self.session_store.clone(),
1475            self.state_store.clone(),
1476            Arc::clone(&self.secrets),
1477            self.oauth_config.clone(),
1478            Some(ctx),
1479        )?;
1480        let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1481        let mut store = wasmtime::Store::new(&self.engine, store_state);
1482        let bindings: crate::provider_core::SchemaCore = pre.instantiate_async(&mut store).await?;
1483        let provider = bindings.greentic_provider_core_schema_core_api();
1484
1485        let result = provider.call_invoke(&mut store, op, &input_json)?;
1486        deserialize_json_bytes(result)
1487    }
1488
1489    fn provider_registry(&self) -> Result<ProviderRegistry> {
1490        if let Some(registry) = self.provider_registry.read().clone() {
1491            return Ok(registry);
1492        }
1493        let manifest = self
1494            .manifest
1495            .as_ref()
1496            .context("pack manifest required for provider resolution")?;
1497        let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1498        let registry = ProviderRegistry::new(
1499            manifest,
1500            self.state_store.clone(),
1501            &self.config.tenant,
1502            &env,
1503        )?;
1504        *self.provider_registry.write() = Some(registry.clone());
1505        Ok(registry)
1506    }
1507
1508    pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1509        if let Some(cache) = &self.flows {
1510            return cache
1511                .flows
1512                .get(flow_id)
1513                .cloned()
1514                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1515        }
1516        if let Some(manifest) = &self.manifest {
1517            let entry = manifest
1518                .flows
1519                .iter()
1520                .find(|f| f.id.as_str() == flow_id)
1521                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1522            return Ok(entry.flow.clone());
1523        }
1524        bail!("flow '{flow_id}' not available (pack exports disabled)")
1525    }
1526
1527    pub fn metadata(&self) -> &PackMetadata {
1528        &self.metadata
1529    }
1530
1531    pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1532        &self.metadata.secret_requirements
1533    }
1534
1535    pub fn missing_secrets(
1536        &self,
1537        tenant_ctx: &TypesTenantCtx,
1538    ) -> Vec<greentic_types::SecretRequirement> {
1539        let env = tenant_ctx.env.as_str().to_string();
1540        let tenant = tenant_ctx.tenant.as_str().to_string();
1541        let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1542        self.required_secrets()
1543            .iter()
1544            .filter(|req| {
1545                // scope must match current context if provided
1546                if let Some(scope) = &req.scope {
1547                    if scope.env != env {
1548                        return false;
1549                    }
1550                    if scope.tenant != tenant {
1551                        return false;
1552                    }
1553                    if let Some(ref team_req) = scope.team
1554                        && team.as_ref() != Some(team_req)
1555                    {
1556                        return false;
1557                    }
1558                }
1559                let ctx = self.config.tenant_ctx();
1560                read_secret_blocking(&self.secrets, &ctx, req.key.as_str()).is_err()
1561            })
1562            .cloned()
1563            .collect()
1564    }
1565
1566    pub fn for_component_test(
1567        components: Vec<(String, PathBuf)>,
1568        flows: HashMap<String, FlowIR>,
1569        pack_id: &str,
1570        config: Arc<HostConfig>,
1571    ) -> Result<Self> {
1572        let engine = Engine::default();
1573        let engine_profile =
1574            EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1575        let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1576        let mut component_map = HashMap::new();
1577        for (name, path) in components {
1578            if !path.exists() {
1579                bail!("component artifact missing: {}", path.display());
1580            }
1581            let wasm_bytes = std::fs::read(&path)?;
1582            let component = Arc::new(
1583                Component::from_binary(&engine, &wasm_bytes)
1584                    .with_context(|| format!("failed to compile component {}", path.display()))?,
1585            );
1586            component_map.insert(
1587                name.clone(),
1588                PackComponent {
1589                    name,
1590                    version: "0.0.0".into(),
1591                    component,
1592                },
1593            );
1594        }
1595
1596        let mut flow_map = HashMap::new();
1597        let mut descriptors = Vec::new();
1598        for (id, ir) in flows {
1599            let flow_type = ir.flow_type.clone();
1600            let flow = flow_ir_to_flow(ir)?;
1601            flow_map.insert(id.clone(), flow);
1602            descriptors.push(FlowDescriptor {
1603                id: id.clone(),
1604                flow_type,
1605                pack_id: pack_id.to_string(),
1606                profile: "test".into(),
1607                version: "0.0.0".into(),
1608                description: None,
1609            });
1610        }
1611        let entry_flows = descriptors.iter().map(|flow| flow.id.clone()).collect();
1612        let metadata = PackMetadata {
1613            pack_id: pack_id.to_string(),
1614            version: "0.0.0".into(),
1615            entry_flows,
1616            secret_requirements: Vec::new(),
1617        };
1618        let flows_cache = PackFlows {
1619            descriptors: descriptors.clone(),
1620            flows: flow_map,
1621            metadata: metadata.clone(),
1622        };
1623
1624        Ok(Self {
1625            path: PathBuf::new(),
1626            archive_path: None,
1627            config,
1628            engine,
1629            metadata,
1630            manifest: None,
1631            legacy_manifest: None,
1632            component_manifests: HashMap::new(),
1633            mocks: None,
1634            flows: Some(flows_cache),
1635            components: component_map,
1636            http_client: Arc::clone(&HTTP_CLIENT),
1637            pre_cache: Mutex::new(HashMap::new()),
1638            session_store: None,
1639            state_store: None,
1640            wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1641            provider_registry: RwLock::new(None),
1642            secrets: crate::secrets::default_manager()?,
1643            oauth_config: None,
1644            cache,
1645        })
1646    }
1647}
1648
1649fn is_missing_node_export(err: &wasmtime::Error, version: &str) -> bool {
1650    let message = err.to_string();
1651    message.contains("no exported instance named")
1652        && message.contains(&format!("greentic:component/node@{version}"))
1653}
1654
1655struct PackFlows {
1656    descriptors: Vec<FlowDescriptor>,
1657    flows: HashMap<String, Flow>,
1658    metadata: PackMetadata,
1659}
1660
1661const RUNTIME_FLOW_EXTENSION_IDS: [&str; 3] = [
1662    "greentic.pack.runtime_flow",
1663    "greentic.pack.flow_runtime",
1664    "greentic.pack.runtime_flows",
1665];
1666
1667#[derive(Debug, Deserialize)]
1668struct RuntimeFlowBundle {
1669    flows: Vec<RuntimeFlow>,
1670}
1671
1672#[derive(Debug, Deserialize)]
1673struct RuntimeFlow {
1674    id: String,
1675    #[serde(alias = "flow_type")]
1676    kind: FlowKind,
1677    #[serde(default)]
1678    schema_version: Option<String>,
1679    #[serde(default)]
1680    start: Option<String>,
1681    #[serde(default)]
1682    entrypoints: BTreeMap<String, Value>,
1683    nodes: BTreeMap<String, RuntimeNode>,
1684    #[serde(default)]
1685    metadata: Option<FlowMetadata>,
1686}
1687
1688#[derive(Debug, Deserialize)]
1689struct RuntimeNode {
1690    #[serde(alias = "component")]
1691    component_id: String,
1692    #[serde(default, alias = "operation")]
1693    operation_name: Option<String>,
1694    #[serde(default, alias = "payload", alias = "input")]
1695    operation_payload: Value,
1696    #[serde(default)]
1697    routing: Option<Routing>,
1698    #[serde(default)]
1699    telemetry: Option<TelemetryHints>,
1700}
1701
1702fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
1703    if bytes.is_empty() {
1704        return Ok(Value::Null);
1705    }
1706    serde_json::from_slice(&bytes).or_else(|_| {
1707        String::from_utf8(bytes)
1708            .map(Value::String)
1709            .map_err(|err| anyhow!(err))
1710    })
1711}
1712
1713impl PackFlows {
1714    fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
1715        if let Some(flows) = flows_from_runtime_extension(&manifest) {
1716            return flows;
1717        }
1718        let descriptors = manifest
1719            .flows
1720            .iter()
1721            .map(|entry| FlowDescriptor {
1722                id: entry.id.as_str().to_string(),
1723                flow_type: flow_kind_to_str(entry.kind).to_string(),
1724                pack_id: manifest.pack_id.as_str().to_string(),
1725                profile: manifest.pack_id.as_str().to_string(),
1726                version: manifest.version.to_string(),
1727                description: None,
1728            })
1729            .collect();
1730        let mut flows = HashMap::new();
1731        for entry in &manifest.flows {
1732            flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
1733        }
1734        Self {
1735            metadata: PackMetadata::from_manifest(&manifest),
1736            descriptors,
1737            flows,
1738        }
1739    }
1740}
1741
1742fn flows_from_runtime_extension(manifest: &greentic_types::PackManifest) -> Option<PackFlows> {
1743    let extensions = manifest.extensions.as_ref()?;
1744    let extension = extensions.iter().find_map(|(key, ext)| {
1745        if RUNTIME_FLOW_EXTENSION_IDS
1746            .iter()
1747            .any(|candidate| candidate == key)
1748        {
1749            Some(ext)
1750        } else {
1751            None
1752        }
1753    })?;
1754    let runtime_flows = match decode_runtime_flow_extension(extension) {
1755        Some(flows) if !flows.is_empty() => flows,
1756        _ => return None,
1757    };
1758
1759    let descriptors = runtime_flows
1760        .iter()
1761        .map(|flow| FlowDescriptor {
1762            id: flow.id.as_str().to_string(),
1763            flow_type: flow_kind_to_str(flow.kind).to_string(),
1764            pack_id: manifest.pack_id.as_str().to_string(),
1765            profile: manifest.pack_id.as_str().to_string(),
1766            version: manifest.version.to_string(),
1767            description: None,
1768        })
1769        .collect::<Vec<_>>();
1770    let flows = runtime_flows
1771        .into_iter()
1772        .map(|flow| (flow.id.as_str().to_string(), flow))
1773        .collect();
1774
1775    Some(PackFlows {
1776        metadata: PackMetadata::from_manifest(manifest),
1777        descriptors,
1778        flows,
1779    })
1780}
1781
1782fn decode_runtime_flow_extension(extension: &ExtensionRef) -> Option<Vec<Flow>> {
1783    let value = match extension.inline.as_ref()? {
1784        ExtensionInline::Other(value) => value.clone(),
1785        _ => return None,
1786    };
1787
1788    if let Ok(bundle) = serde_json::from_value::<RuntimeFlowBundle>(value.clone()) {
1789        return Some(collect_runtime_flows(bundle.flows));
1790    }
1791
1792    if let Ok(flows) = serde_json::from_value::<Vec<RuntimeFlow>>(value.clone()) {
1793        return Some(collect_runtime_flows(flows));
1794    }
1795
1796    if let Ok(flows) = serde_json::from_value::<Vec<Flow>>(value) {
1797        return Some(flows);
1798    }
1799
1800    warn!(
1801        extension = %extension.kind,
1802        version = %extension.version,
1803        "runtime flow extension present but could not be decoded"
1804    );
1805    None
1806}
1807
1808fn collect_runtime_flows(flows: Vec<RuntimeFlow>) -> Vec<Flow> {
1809    flows
1810        .into_iter()
1811        .filter_map(|flow| match runtime_flow_to_flow(flow) {
1812            Ok(flow) => Some(flow),
1813            Err(err) => {
1814                warn!(error = %err, "failed to decode runtime flow");
1815                None
1816            }
1817        })
1818        .collect()
1819}
1820
1821fn runtime_flow_to_flow(runtime: RuntimeFlow) -> Result<Flow> {
1822    let flow_id = FlowId::from_str(&runtime.id)
1823        .with_context(|| format!("invalid flow id `{}`", runtime.id))?;
1824    let mut entrypoints = runtime.entrypoints;
1825    if entrypoints.is_empty()
1826        && let Some(start) = &runtime.start
1827    {
1828        entrypoints.insert("default".into(), Value::String(start.clone()));
1829    }
1830
1831    let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
1832    for (id, node) in runtime.nodes {
1833        let node_id = NodeId::from_str(&id).with_context(|| format!("invalid node id `{id}`"))?;
1834        let component_id = ComponentId::from_str(&node.component_id)
1835            .with_context(|| format!("invalid component id `{}`", node.component_id))?;
1836        let component = FlowComponentRef {
1837            id: component_id,
1838            pack_alias: None,
1839            operation: node.operation_name,
1840        };
1841        let routing = node.routing.unwrap_or(Routing::End);
1842        let telemetry = node.telemetry.unwrap_or_default();
1843        nodes.insert(
1844            node_id.clone(),
1845            Node {
1846                id: node_id,
1847                component,
1848                input: InputMapping {
1849                    mapping: node.operation_payload,
1850                },
1851                output: OutputMapping {
1852                    mapping: Value::Null,
1853                },
1854                routing,
1855                telemetry,
1856            },
1857        );
1858    }
1859
1860    Ok(Flow {
1861        schema_version: runtime.schema_version.unwrap_or_else(|| "1.0".to_string()),
1862        id: flow_id,
1863        kind: runtime.kind,
1864        entrypoints,
1865        nodes,
1866        metadata: runtime.metadata.unwrap_or_default(),
1867    })
1868}
1869
1870fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
1871    match kind {
1872        greentic_types::FlowKind::Messaging => "messaging",
1873        greentic_types::FlowKind::Event => "event",
1874        greentic_types::FlowKind::ComponentConfig => "component-config",
1875        greentic_types::FlowKind::Job => "job",
1876        greentic_types::FlowKind::Http => "http",
1877    }
1878}
1879
1880fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1881    let mut file = archive
1882        .by_name(name)
1883        .with_context(|| format!("entry {name} missing from archive"))?;
1884    let mut buf = Vec::new();
1885    file.read_to_end(&mut buf)?;
1886    Ok(buf)
1887}
1888
1889fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
1890    for node in doc.nodes.values_mut() {
1891        let Some((component_ref, payload)) = node
1892            .raw
1893            .iter()
1894            .next()
1895            .map(|(key, value)| (key.clone(), value.clone()))
1896        else {
1897            continue;
1898        };
1899        if component_ref.starts_with("emit.") {
1900            node.operation = Some(component_ref);
1901            node.payload = payload;
1902            node.raw.clear();
1903            continue;
1904        }
1905        let (target_component, operation, input, config) =
1906            infer_component_exec(&payload, &component_ref);
1907        let mut payload_obj = serde_json::Map::new();
1908        // component.exec is meta; ensure the payload carries the actual target component.
1909        payload_obj.insert("component".into(), Value::String(target_component));
1910        payload_obj.insert("operation".into(), Value::String(operation));
1911        payload_obj.insert("input".into(), input);
1912        if let Some(cfg) = config {
1913            payload_obj.insert("config".into(), cfg);
1914        }
1915        node.operation = Some("component.exec".to_string());
1916        node.payload = Value::Object(payload_obj);
1917        node.raw.clear();
1918    }
1919    doc
1920}
1921
1922fn infer_component_exec(
1923    payload: &Value,
1924    component_ref: &str,
1925) -> (String, String, Value, Option<Value>) {
1926    let default_op = if component_ref.starts_with("templating.") {
1927        "render"
1928    } else {
1929        "invoke"
1930    }
1931    .to_string();
1932
1933    if let Value::Object(map) = payload {
1934        let op = map
1935            .get("op")
1936            .or_else(|| map.get("operation"))
1937            .and_then(Value::as_str)
1938            .map(|s| s.to_string())
1939            .unwrap_or_else(|| default_op.clone());
1940
1941        let mut input = map.clone();
1942        let config = input.remove("config");
1943        let component = input
1944            .get("component")
1945            .or_else(|| input.get("component_ref"))
1946            .and_then(Value::as_str)
1947            .map(|s| s.to_string())
1948            .unwrap_or_else(|| component_ref.to_string());
1949        input.remove("component");
1950        input.remove("component_ref");
1951        input.remove("op");
1952        input.remove("operation");
1953        return (component, op, Value::Object(input), config);
1954    }
1955
1956    (component_ref.to_string(), default_op, payload.clone(), None)
1957}
1958
1959#[derive(Clone, Debug)]
1960struct ComponentSpec {
1961    id: String,
1962    version: String,
1963    legacy_path: Option<String>,
1964}
1965
1966#[derive(Clone, Debug)]
1967struct ComponentSourceInfo {
1968    digest: Option<String>,
1969    source: ComponentSourceRef,
1970    artifact: ComponentArtifactLocation,
1971    expected_wasm_sha256: Option<String>,
1972    skip_digest_verification: bool,
1973}
1974
1975#[derive(Clone, Debug)]
1976enum ComponentArtifactLocation {
1977    Inline { wasm_path: String },
1978    Remote,
1979}
1980
1981#[derive(Clone, Debug, Deserialize)]
1982struct PackLockV1 {
1983    schema_version: u32,
1984    components: Vec<PackLockComponent>,
1985}
1986
1987#[derive(Clone, Debug, Deserialize)]
1988struct PackLockComponent {
1989    name: String,
1990    #[serde(default, rename = "source_ref")]
1991    source_ref: Option<String>,
1992    #[serde(default, rename = "ref")]
1993    legacy_ref: Option<String>,
1994    #[serde(default)]
1995    component_id: Option<ComponentId>,
1996    #[serde(default)]
1997    bundled: Option<bool>,
1998    #[serde(default, rename = "bundled_path")]
1999    bundled_path: Option<String>,
2000    #[serde(default, rename = "path")]
2001    legacy_path: Option<String>,
2002    #[serde(default)]
2003    wasm_sha256: Option<String>,
2004    #[serde(default, rename = "sha256")]
2005    legacy_sha256: Option<String>,
2006    #[serde(default)]
2007    resolved_digest: Option<String>,
2008    #[serde(default)]
2009    digest: Option<String>,
2010}
2011
2012fn component_specs(
2013    manifest: Option<&greentic_types::PackManifest>,
2014    legacy_manifest: Option<&legacy_pack::PackManifest>,
2015    component_sources: Option<&ComponentSourcesV1>,
2016    pack_lock: Option<&PackLockV1>,
2017) -> Vec<ComponentSpec> {
2018    if let Some(manifest) = manifest {
2019        if !manifest.components.is_empty() {
2020            return manifest
2021                .components
2022                .iter()
2023                .map(|entry| ComponentSpec {
2024                    id: entry.id.as_str().to_string(),
2025                    version: entry.version.to_string(),
2026                    legacy_path: None,
2027                })
2028                .collect();
2029        }
2030        if let Some(lock) = pack_lock {
2031            let mut seen = HashSet::new();
2032            let mut specs = Vec::new();
2033            for entry in &lock.components {
2034                let id = entry
2035                    .component_id
2036                    .as_ref()
2037                    .map(|id| id.as_str())
2038                    .unwrap_or(entry.name.as_str());
2039                if seen.insert(id.to_string()) {
2040                    specs.push(ComponentSpec {
2041                        id: id.to_string(),
2042                        version: "0.0.0".to_string(),
2043                        legacy_path: None,
2044                    });
2045                }
2046            }
2047            return specs;
2048        }
2049        if let Some(sources) = component_sources {
2050            let mut seen = HashSet::new();
2051            let mut specs = Vec::new();
2052            for entry in &sources.components {
2053                let id = entry
2054                    .component_id
2055                    .as_ref()
2056                    .map(|id| id.as_str())
2057                    .unwrap_or(entry.name.as_str());
2058                if seen.insert(id.to_string()) {
2059                    specs.push(ComponentSpec {
2060                        id: id.to_string(),
2061                        version: "0.0.0".to_string(),
2062                        legacy_path: None,
2063                    });
2064                }
2065            }
2066            return specs;
2067        }
2068    }
2069    if let Some(legacy_manifest) = legacy_manifest {
2070        return legacy_manifest
2071            .components
2072            .iter()
2073            .map(|entry| ComponentSpec {
2074                id: entry.name.clone(),
2075                version: entry.version.to_string(),
2076                legacy_path: Some(entry.file_wasm.clone()),
2077            })
2078            .collect();
2079    }
2080    Vec::new()
2081}
2082
2083fn component_sources_table(
2084    sources: Option<&ComponentSourcesV1>,
2085) -> Result<Option<HashMap<String, ComponentSourceInfo>>> {
2086    let Some(sources) = sources else {
2087        return Ok(None);
2088    };
2089    let mut table = HashMap::new();
2090    for entry in &sources.components {
2091        let artifact = match &entry.artifact {
2092            ArtifactLocationV1::Inline { wasm_path, .. } => ComponentArtifactLocation::Inline {
2093                wasm_path: wasm_path.clone(),
2094            },
2095            ArtifactLocationV1::Remote => ComponentArtifactLocation::Remote,
2096        };
2097        let info = ComponentSourceInfo {
2098            digest: Some(entry.resolved.digest.clone()),
2099            source: entry.source.clone(),
2100            artifact,
2101            expected_wasm_sha256: None,
2102            skip_digest_verification: false,
2103        };
2104        if let Some(component_id) = entry.component_id.as_ref() {
2105            table.insert(component_id.as_str().to_string(), info.clone());
2106        }
2107        table.insert(entry.name.clone(), info);
2108    }
2109    Ok(Some(table))
2110}
2111
2112fn load_pack_lock(path: &Path) -> Result<Option<PackLockV1>> {
2113    let lock_path = if path.is_dir() {
2114        let candidate = path.join("pack.lock");
2115        if candidate.exists() {
2116            Some(candidate)
2117        } else {
2118            let candidate = path.join("pack.lock.json");
2119            candidate.exists().then_some(candidate)
2120        }
2121    } else {
2122        None
2123    };
2124    let Some(lock_path) = lock_path else {
2125        return Ok(None);
2126    };
2127    let raw = std::fs::read_to_string(&lock_path)
2128        .with_context(|| format!("failed to read {}", lock_path.display()))?;
2129    let lock: PackLockV1 = serde_json::from_str(&raw).context("failed to parse pack.lock")?;
2130    if lock.schema_version != 1 {
2131        bail!("pack.lock schema_version must be 1");
2132    }
2133    Ok(Some(lock))
2134}
2135
2136fn find_pack_lock_roots(
2137    pack_path: &Path,
2138    is_dir: bool,
2139    archive_hint: Option<&Path>,
2140) -> Vec<PathBuf> {
2141    if is_dir {
2142        return vec![pack_path.to_path_buf()];
2143    }
2144    let mut roots = Vec::new();
2145    if let Some(archive_path) = archive_hint {
2146        if let Some(parent) = archive_path.parent() {
2147            roots.push(parent.to_path_buf());
2148            if let Some(grandparent) = parent.parent() {
2149                roots.push(grandparent.to_path_buf());
2150            }
2151        }
2152    } else if let Some(parent) = pack_path.parent() {
2153        roots.push(parent.to_path_buf());
2154        if let Some(grandparent) = parent.parent() {
2155            roots.push(grandparent.to_path_buf());
2156        }
2157    }
2158    roots
2159}
2160
2161fn normalize_sha256(digest: &str) -> Result<String> {
2162    let trimmed = digest.trim();
2163    if trimmed.is_empty() {
2164        bail!("sha256 digest cannot be empty");
2165    }
2166    if let Some(stripped) = trimmed.strip_prefix("sha256:") {
2167        if stripped.is_empty() {
2168            bail!("sha256 digest must include hex bytes after sha256:");
2169        }
2170        return Ok(trimmed.to_string());
2171    }
2172    if trimmed.chars().all(|c| c.is_ascii_hexdigit()) {
2173        return Ok(format!("sha256:{trimmed}"));
2174    }
2175    bail!("sha256 digest must be hex or sha256:<hex>");
2176}
2177
2178fn component_sources_table_from_pack_lock(
2179    lock: &PackLockV1,
2180    allow_missing_hash: bool,
2181) -> Result<HashMap<String, ComponentSourceInfo>> {
2182    let mut table = HashMap::new();
2183    let mut names = HashSet::new();
2184    for entry in &lock.components {
2185        if !names.insert(entry.name.clone()) {
2186            bail!(
2187                "pack.lock contains duplicate component name `{}`",
2188                entry.name
2189            );
2190        }
2191        let source_ref = match (&entry.source_ref, &entry.legacy_ref) {
2192            (Some(primary), Some(legacy)) => {
2193                if primary != legacy {
2194                    bail!(
2195                        "pack.lock component {} has conflicting refs: {} vs {}",
2196                        entry.name,
2197                        primary,
2198                        legacy
2199                    );
2200                }
2201                primary.as_str()
2202            }
2203            (Some(primary), None) => primary.as_str(),
2204            (None, Some(legacy)) => legacy.as_str(),
2205            (None, None) => {
2206                bail!("pack.lock component {} missing source_ref", entry.name);
2207            }
2208        };
2209        let source: ComponentSourceRef = source_ref
2210            .parse()
2211            .with_context(|| format!("invalid component ref `{}`", source_ref))?;
2212        let bundled_path = match (&entry.bundled_path, &entry.legacy_path) {
2213            (Some(primary), Some(legacy)) => {
2214                if primary != legacy {
2215                    bail!(
2216                        "pack.lock component {} has conflicting bundled paths: {} vs {}",
2217                        entry.name,
2218                        primary,
2219                        legacy
2220                    );
2221                }
2222                Some(primary.clone())
2223            }
2224            (Some(primary), None) => Some(primary.clone()),
2225            (None, Some(legacy)) => Some(legacy.clone()),
2226            (None, None) => None,
2227        };
2228        let bundled = entry.bundled.unwrap_or(false) || bundled_path.is_some();
2229        let (artifact, digest, expected_wasm_sha256, skip_digest_verification) = if bundled {
2230            let wasm_path = bundled_path.ok_or_else(|| {
2231                anyhow!(
2232                    "pack.lock component {} marked bundled but bundled_path is missing",
2233                    entry.name
2234                )
2235            })?;
2236            let expected_raw = match (&entry.wasm_sha256, &entry.legacy_sha256) {
2237                (Some(primary), Some(legacy)) => {
2238                    if primary != legacy {
2239                        bail!(
2240                            "pack.lock component {} has conflicting wasm_sha256 values: {} vs {}",
2241                            entry.name,
2242                            primary,
2243                            legacy
2244                        );
2245                    }
2246                    Some(primary.as_str())
2247                }
2248                (Some(primary), None) => Some(primary.as_str()),
2249                (None, Some(legacy)) => Some(legacy.as_str()),
2250                (None, None) => None,
2251            };
2252            let expected = match expected_raw {
2253                Some(value) => Some(normalize_sha256(value)?),
2254                None => None,
2255            };
2256            if expected.is_none() && !allow_missing_hash {
2257                bail!(
2258                    "pack.lock component {} missing wasm_sha256 for bundled component",
2259                    entry.name
2260                );
2261            }
2262            (
2263                ComponentArtifactLocation::Inline { wasm_path },
2264                expected.clone(),
2265                expected,
2266                allow_missing_hash && expected_raw.is_none(),
2267            )
2268        } else {
2269            if source.is_tag() {
2270                bail!(
2271                    "component {} uses tag ref {} but is not bundled; rebuild the pack",
2272                    entry.name,
2273                    source
2274                );
2275            }
2276            let expected = entry
2277                .resolved_digest
2278                .as_deref()
2279                .or(entry.digest.as_deref())
2280                .ok_or_else(|| {
2281                    anyhow!(
2282                        "pack.lock component {} missing resolved_digest for remote component",
2283                        entry.name
2284                    )
2285                })?;
2286            (
2287                ComponentArtifactLocation::Remote,
2288                Some(normalize_digest(expected)),
2289                None,
2290                false,
2291            )
2292        };
2293        let info = ComponentSourceInfo {
2294            digest,
2295            source,
2296            artifact,
2297            expected_wasm_sha256,
2298            skip_digest_verification,
2299        };
2300        if let Some(component_id) = entry.component_id.as_ref() {
2301            let key = component_id.as_str().to_string();
2302            if table.contains_key(&key) {
2303                bail!(
2304                    "pack.lock contains duplicate component id `{}`",
2305                    component_id.as_str()
2306                );
2307            }
2308            table.insert(key, info.clone());
2309        }
2310        if entry.name
2311            != entry
2312                .component_id
2313                .as_ref()
2314                .map(|id| id.as_str())
2315                .unwrap_or("")
2316        {
2317            table.insert(entry.name.clone(), info);
2318        }
2319    }
2320    Ok(table)
2321}
2322
2323fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
2324    if let Some(path) = &spec.legacy_path {
2325        return root.join(path);
2326    }
2327    root.join("components").join(format!("{}.wasm", spec.id))
2328}
2329
2330fn normalize_digest(digest: &str) -> String {
2331    if digest.starts_with("sha256:") || digest.starts_with("blake3:") {
2332        digest.to_string()
2333    } else {
2334        format!("sha256:{digest}")
2335    }
2336}
2337
2338fn compute_digest_for(bytes: &[u8], digest: &str) -> Result<String> {
2339    if digest.starts_with("blake3:") {
2340        let hash = blake3::hash(bytes);
2341        return Ok(format!("blake3:{}", hash.to_hex()));
2342    }
2343    let mut hasher = sha2::Sha256::new();
2344    hasher.update(bytes);
2345    Ok(format!("sha256:{:x}", hasher.finalize()))
2346}
2347
2348fn compute_sha256_digest_for(bytes: &[u8]) -> String {
2349    let mut hasher = sha2::Sha256::new();
2350    hasher.update(bytes);
2351    format!("sha256:{:x}", hasher.finalize())
2352}
2353
2354fn build_artifact_key(cache: &CacheManager, digest: Option<&str>, bytes: &[u8]) -> ArtifactKey {
2355    let wasm_digest = digest
2356        .map(normalize_digest)
2357        .unwrap_or_else(|| compute_sha256_digest_for(bytes));
2358    ArtifactKey::new(cache.engine_profile_id().to_string(), wasm_digest)
2359}
2360
2361async fn compile_component_with_cache(
2362    cache: &CacheManager,
2363    engine: &Engine,
2364    digest: Option<&str>,
2365    bytes: Vec<u8>,
2366) -> Result<Arc<Component>> {
2367    let key = build_artifact_key(cache, digest, &bytes);
2368    cache.get_component(engine, &key, || Ok(bytes)).await
2369}
2370
2371fn verify_component_digest(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2372    let normalized_expected = normalize_digest(expected);
2373    let actual = compute_digest_for(bytes, &normalized_expected)?;
2374    if normalize_digest(&actual) != normalized_expected {
2375        bail!(
2376            "component {component_id} digest mismatch: expected {normalized_expected}, got {actual}"
2377        );
2378    }
2379    Ok(())
2380}
2381
2382fn verify_wasm_sha256(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2383    let normalized_expected = normalize_sha256(expected)?;
2384    let actual = compute_sha256_digest_for(bytes);
2385    if actual != normalized_expected {
2386        bail!(
2387            "component {component_id} bundled digest mismatch: expected {normalized_expected}, got {actual}"
2388        );
2389    }
2390    Ok(())
2391}
2392
2393#[cfg(test)]
2394mod pack_lock_tests {
2395    use super::*;
2396    use tempfile::TempDir;
2397
2398    #[test]
2399    fn pack_lock_tag_ref_requires_bundle() {
2400        let lock = PackLockV1 {
2401            schema_version: 1,
2402            components: vec![PackLockComponent {
2403                name: "templates".to_string(),
2404                source_ref: Some("oci://registry.test/templates:latest".to_string()),
2405                legacy_ref: None,
2406                component_id: None,
2407                bundled: Some(false),
2408                bundled_path: None,
2409                legacy_path: None,
2410                wasm_sha256: None,
2411                legacy_sha256: None,
2412                resolved_digest: None,
2413                digest: None,
2414            }],
2415        };
2416        let err = component_sources_table_from_pack_lock(&lock, false).unwrap_err();
2417        assert!(
2418            err.to_string().contains("tag ref") && err.to_string().contains("rebuild the pack"),
2419            "unexpected error: {err}"
2420        );
2421    }
2422
2423    #[test]
2424    fn bundled_hash_mismatch_errors() {
2425        let rt = tokio::runtime::Runtime::new().expect("runtime");
2426        let temp = TempDir::new().expect("temp dir");
2427        let engine = Engine::default();
2428        let engine_profile =
2429            EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
2430        let cache_config = CacheConfig {
2431            root: temp.path().join("cache"),
2432            ..CacheConfig::default()
2433        };
2434        let cache = CacheManager::new(cache_config, engine_profile);
2435        let wasm_path = temp.path().join("component.wasm");
2436        let fixture_wasm = Path::new(env!("CARGO_MANIFEST_DIR"))
2437            .join("../../tests/fixtures/packs/secrets_store_smoke/components/echo_secret.wasm");
2438        let bytes = std::fs::read(&fixture_wasm).expect("read fixture wasm");
2439        std::fs::write(&wasm_path, &bytes).expect("write temp wasm");
2440
2441        let spec = ComponentSpec {
2442            id: "qa.process".to_string(),
2443            version: "0.0.0".to_string(),
2444            legacy_path: None,
2445        };
2446        let mut missing = HashSet::new();
2447        missing.insert(spec.id.clone());
2448
2449        let mut sources = HashMap::new();
2450        sources.insert(
2451            spec.id.clone(),
2452            ComponentSourceInfo {
2453                digest: Some("sha256:deadbeef".to_string()),
2454                source: ComponentSourceRef::Oci("registry.test/qa.process@sha256:deadbeef".into()),
2455                artifact: ComponentArtifactLocation::Inline {
2456                    wasm_path: "component.wasm".to_string(),
2457                },
2458                expected_wasm_sha256: Some("sha256:deadbeef".to_string()),
2459                skip_digest_verification: false,
2460            },
2461        );
2462
2463        let mut loaded = HashMap::new();
2464        let result = rt.block_on(load_components_from_sources(
2465            &cache,
2466            &engine,
2467            &sources,
2468            &ComponentResolution::default(),
2469            &[spec],
2470            &mut missing,
2471            &mut loaded,
2472            Some(temp.path()),
2473            None,
2474        ));
2475        let err = result.unwrap_err();
2476        assert!(
2477            err.to_string().contains("bundled digest mismatch"),
2478            "unexpected error: {err}"
2479        );
2480    }
2481}
2482
2483fn dist_options_from(component_resolution: &ComponentResolution) -> DistOptions {
2484    let mut opts = DistOptions {
2485        allow_tags: true,
2486        ..DistOptions::default()
2487    };
2488    if let Some(cache_dir) = component_resolution.dist_cache_dir.clone() {
2489        opts.cache_dir = cache_dir;
2490    }
2491    if component_resolution.dist_offline {
2492        opts.offline = true;
2493    }
2494    opts
2495}
2496
2497#[allow(clippy::too_many_arguments)]
2498async fn load_components_from_sources(
2499    cache: &CacheManager,
2500    engine: &Engine,
2501    component_sources: &HashMap<String, ComponentSourceInfo>,
2502    component_resolution: &ComponentResolution,
2503    specs: &[ComponentSpec],
2504    missing: &mut HashSet<String>,
2505    into: &mut HashMap<String, PackComponent>,
2506    materialized_root: Option<&Path>,
2507    archive_hint: Option<&Path>,
2508) -> Result<()> {
2509    let mut archive = if let Some(path) = archive_hint {
2510        Some(
2511            ZipArchive::new(File::open(path)?)
2512                .with_context(|| format!("{} is not a valid gtpack", path.display()))?,
2513        )
2514    } else {
2515        None
2516    };
2517    let mut dist_client: Option<DistClient> = None;
2518
2519    for spec in specs {
2520        if !missing.contains(&spec.id) {
2521            continue;
2522        }
2523        let Some(source) = component_sources.get(&spec.id) else {
2524            continue;
2525        };
2526
2527        let bytes = match &source.artifact {
2528            ComponentArtifactLocation::Inline { wasm_path } => {
2529                if let Some(root) = materialized_root {
2530                    let path = root.join(wasm_path);
2531                    if path.exists() {
2532                        std::fs::read(&path).with_context(|| {
2533                            format!(
2534                                "failed to read inline component {} from {}",
2535                                spec.id,
2536                                path.display()
2537                            )
2538                        })?
2539                    } else if archive.is_none() {
2540                        bail!("inline component {} missing at {}", spec.id, path.display());
2541                    } else {
2542                        read_entry(
2543                            archive.as_mut().expect("archive present when needed"),
2544                            wasm_path,
2545                        )
2546                        .with_context(|| {
2547                            format!(
2548                                "inline component {} missing at {} in pack archive",
2549                                spec.id, wasm_path
2550                            )
2551                        })?
2552                    }
2553                } else if let Some(archive) = archive.as_mut() {
2554                    read_entry(archive, wasm_path).with_context(|| {
2555                        format!(
2556                            "inline component {} missing at {} in pack archive",
2557                            spec.id, wasm_path
2558                        )
2559                    })?
2560                } else {
2561                    bail!(
2562                        "inline component {} missing and no pack source available",
2563                        spec.id
2564                    );
2565                }
2566            }
2567            ComponentArtifactLocation::Remote => {
2568                if source.source.is_tag() {
2569                    bail!(
2570                        "component {} uses tag ref {} but is not bundled; rebuild the pack",
2571                        spec.id,
2572                        source.source
2573                    );
2574                }
2575                let client = dist_client.get_or_insert_with(|| {
2576                    DistClient::new(dist_options_from(component_resolution))
2577                });
2578                let reference = source.source.to_string();
2579                fault::maybe_fail_asset(&reference)
2580                    .await
2581                    .with_context(|| format!("fault injection blocked asset {reference}"))?;
2582                let digest = source.digest.as_deref().ok_or_else(|| {
2583                    anyhow!(
2584                        "component {} missing expected digest for remote component",
2585                        spec.id
2586                    )
2587                })?;
2588                let cache_path = if component_resolution.dist_offline {
2589                    client
2590                        .fetch_digest(digest)
2591                        .await
2592                        .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?
2593                } else {
2594                    let resolved = client
2595                        .resolve_ref(&reference)
2596                        .await
2597                        .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
2598                    let expected = normalize_digest(digest);
2599                    let actual = normalize_digest(&resolved.digest);
2600                    if expected != actual {
2601                        bail!(
2602                            "component {} digest mismatch after fetch: expected {}, got {}",
2603                            spec.id,
2604                            expected,
2605                            actual
2606                        );
2607                    }
2608                    resolved.cache_path.ok_or_else(|| {
2609                        anyhow!(
2610                            "component {} resolved from {} but cache path is missing",
2611                            spec.id,
2612                            reference
2613                        )
2614                    })?
2615                };
2616                std::fs::read(&cache_path).with_context(|| {
2617                    format!(
2618                        "failed to read cached component {} from {}",
2619                        spec.id,
2620                        cache_path.display()
2621                    )
2622                })?
2623            }
2624        };
2625
2626        if let Some(expected) = source.expected_wasm_sha256.as_deref() {
2627            verify_wasm_sha256(&spec.id, expected, &bytes)?;
2628        } else if source.skip_digest_verification {
2629            let actual = compute_sha256_digest_for(&bytes);
2630            warn!(
2631                component_id = %spec.id,
2632                digest = %actual,
2633                "bundled component missing wasm_sha256; allowing due to flag"
2634            );
2635        } else {
2636            let expected = source.digest.as_deref().ok_or_else(|| {
2637                anyhow!(
2638                    "component {} missing expected digest for verification",
2639                    spec.id
2640                )
2641            })?;
2642            verify_component_digest(&spec.id, expected, &bytes)?;
2643        }
2644        let component =
2645            compile_component_with_cache(cache, engine, source.digest.as_deref(), bytes)
2646                .await
2647                .with_context(|| format!("failed to compile component {}", spec.id))?;
2648        into.insert(
2649            spec.id.clone(),
2650            PackComponent {
2651                name: spec.id.clone(),
2652                version: spec.version.clone(),
2653                component,
2654            },
2655        );
2656        missing.remove(&spec.id);
2657    }
2658
2659    Ok(())
2660}
2661
2662fn dist_error_for_component(err: DistError, component_id: &str, reference: &str) -> anyhow::Error {
2663    match err {
2664        DistError::CacheMiss { reference: missing } => anyhow!(
2665            "remote component {} is not cached for {}. Run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2666            component_id,
2667            missing,
2668            reference
2669        ),
2670        DistError::Offline { reference: blocked } => anyhow!(
2671            "offline mode blocked fetching component {} from {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2672            component_id,
2673            blocked,
2674            reference
2675        ),
2676        DistError::AuthRequired { target } => anyhow!(
2677            "component {} requires authenticated source {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2678            component_id,
2679            target,
2680            reference
2681        ),
2682        other => anyhow!(
2683            "failed to resolve component {} from {}: {}",
2684            component_id,
2685            reference,
2686            other
2687        ),
2688    }
2689}
2690
2691async fn load_components_from_overrides(
2692    cache: &CacheManager,
2693    engine: &Engine,
2694    overrides: &HashMap<String, PathBuf>,
2695    specs: &[ComponentSpec],
2696    missing: &mut HashSet<String>,
2697    into: &mut HashMap<String, PackComponent>,
2698) -> Result<()> {
2699    for spec in specs {
2700        if !missing.contains(&spec.id) {
2701            continue;
2702        }
2703        let Some(path) = overrides.get(&spec.id) else {
2704            continue;
2705        };
2706        let bytes = std::fs::read(path)
2707            .with_context(|| format!("failed to read override component {}", path.display()))?;
2708        let component = compile_component_with_cache(cache, engine, None, bytes)
2709            .await
2710            .with_context(|| {
2711                format!(
2712                    "failed to compile component {} from override {}",
2713                    spec.id,
2714                    path.display()
2715                )
2716            })?;
2717        into.insert(
2718            spec.id.clone(),
2719            PackComponent {
2720                name: spec.id.clone(),
2721                version: spec.version.clone(),
2722                component,
2723            },
2724        );
2725        missing.remove(&spec.id);
2726    }
2727    Ok(())
2728}
2729
2730async fn load_components_from_dir(
2731    cache: &CacheManager,
2732    engine: &Engine,
2733    root: &Path,
2734    specs: &[ComponentSpec],
2735    missing: &mut HashSet<String>,
2736    into: &mut HashMap<String, PackComponent>,
2737) -> Result<()> {
2738    for spec in specs {
2739        if !missing.contains(&spec.id) {
2740            continue;
2741        }
2742        let path = component_path_for_spec(root, spec);
2743        if !path.exists() {
2744            tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
2745            continue;
2746        }
2747        let bytes = std::fs::read(&path)
2748            .with_context(|| format!("failed to read component {}", path.display()))?;
2749        let component = compile_component_with_cache(cache, engine, None, bytes)
2750            .await
2751            .with_context(|| {
2752                format!(
2753                    "failed to compile component {} from {}",
2754                    spec.id,
2755                    path.display()
2756                )
2757            })?;
2758        into.insert(
2759            spec.id.clone(),
2760            PackComponent {
2761                name: spec.id.clone(),
2762                version: spec.version.clone(),
2763                component,
2764            },
2765        );
2766        missing.remove(&spec.id);
2767    }
2768    Ok(())
2769}
2770
2771async fn load_components_from_archive(
2772    cache: &CacheManager,
2773    engine: &Engine,
2774    path: &Path,
2775    specs: &[ComponentSpec],
2776    missing: &mut HashSet<String>,
2777    into: &mut HashMap<String, PackComponent>,
2778) -> Result<()> {
2779    let mut archive = ZipArchive::new(File::open(path)?)
2780        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
2781    for spec in specs {
2782        if !missing.contains(&spec.id) {
2783            continue;
2784        }
2785        let file_name = spec
2786            .legacy_path
2787            .clone()
2788            .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
2789        let bytes = match read_entry(&mut archive, &file_name) {
2790            Ok(bytes) => bytes,
2791            Err(err) => {
2792                warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
2793                continue;
2794            }
2795        };
2796        let component = compile_component_with_cache(cache, engine, None, bytes)
2797            .await
2798            .with_context(|| format!("failed to compile component {}", spec.id))?;
2799        into.insert(
2800            spec.id.clone(),
2801            PackComponent {
2802                name: spec.id.clone(),
2803                version: spec.version.clone(),
2804                component,
2805            },
2806        );
2807        missing.remove(&spec.id);
2808    }
2809    Ok(())
2810}
2811
2812#[cfg(test)]
2813mod tests {
2814    use super::*;
2815    use greentic_flow::model::{FlowDoc, NodeDoc};
2816    use indexmap::IndexMap;
2817    use serde_json::json;
2818
2819    #[test]
2820    fn normalizes_raw_component_to_component_exec() {
2821        let mut nodes = IndexMap::new();
2822        let mut raw = IndexMap::new();
2823        raw.insert(
2824            "templating.handlebars".into(),
2825            json!({ "template": "Hi {{name}}" }),
2826        );
2827        nodes.insert(
2828            "start".into(),
2829            NodeDoc {
2830                raw,
2831                routing: json!([{"out": true}]),
2832                ..Default::default()
2833            },
2834        );
2835        let doc = FlowDoc {
2836            id: "welcome".into(),
2837            title: None,
2838            description: None,
2839            flow_type: "messaging".into(),
2840            start: Some("start".into()),
2841            parameters: json!({}),
2842            tags: Vec::new(),
2843            schema_version: None,
2844            entrypoints: IndexMap::new(),
2845            nodes,
2846        };
2847
2848        let normalized = normalize_flow_doc(doc);
2849        let node = normalized.nodes.get("start").expect("node exists");
2850        assert_eq!(node.operation.as_deref(), Some("component.exec"));
2851        assert!(node.raw.is_empty());
2852        let payload = node.payload.as_object().expect("payload object");
2853        assert_eq!(
2854            payload.get("component"),
2855            Some(&Value::String("templating.handlebars".into()))
2856        );
2857        assert_eq!(
2858            payload.get("operation"),
2859            Some(&Value::String("render".into()))
2860        );
2861        let input = payload.get("input").unwrap();
2862        assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
2863    }
2864}
2865
2866#[derive(Clone, Debug, Default, Serialize, Deserialize)]
2867pub struct PackMetadata {
2868    pub pack_id: String,
2869    pub version: String,
2870    #[serde(default)]
2871    pub entry_flows: Vec<String>,
2872    #[serde(default)]
2873    pub secret_requirements: Vec<greentic_types::SecretRequirement>,
2874}
2875
2876impl PackMetadata {
2877    fn from_wasm(bytes: &[u8]) -> Option<Self> {
2878        let parser = Parser::new(0);
2879        for payload in parser.parse_all(bytes) {
2880            let payload = payload.ok()?;
2881            match payload {
2882                Payload::CustomSection(section) => {
2883                    if section.name() == "greentic.manifest"
2884                        && let Ok(meta) = Self::from_bytes(section.data())
2885                    {
2886                        return Some(meta);
2887                    }
2888                }
2889                Payload::DataSection(reader) => {
2890                    for segment in reader.into_iter().flatten() {
2891                        if let Ok(meta) = Self::from_bytes(segment.data) {
2892                            return Some(meta);
2893                        }
2894                    }
2895                }
2896                _ => {}
2897            }
2898        }
2899        None
2900    }
2901
2902    fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
2903        #[derive(Deserialize)]
2904        struct RawManifest {
2905            pack_id: String,
2906            version: String,
2907            #[serde(default)]
2908            entry_flows: Vec<String>,
2909            #[serde(default)]
2910            flows: Vec<RawFlow>,
2911            #[serde(default)]
2912            secret_requirements: Vec<greentic_types::SecretRequirement>,
2913        }
2914
2915        #[derive(Deserialize)]
2916        struct RawFlow {
2917            id: String,
2918        }
2919
2920        let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
2921        let mut entry_flows = if manifest.entry_flows.is_empty() {
2922            manifest.flows.iter().map(|f| f.id.clone()).collect()
2923        } else {
2924            manifest.entry_flows.clone()
2925        };
2926        entry_flows.retain(|id| !id.is_empty());
2927        Ok(Self {
2928            pack_id: manifest.pack_id,
2929            version: manifest.version,
2930            entry_flows,
2931            secret_requirements: manifest.secret_requirements,
2932        })
2933    }
2934
2935    pub fn fallback(path: &Path) -> Self {
2936        let pack_id = path
2937            .file_stem()
2938            .map(|s| s.to_string_lossy().into_owned())
2939            .unwrap_or_else(|| "unknown-pack".to_string());
2940        Self {
2941            pack_id,
2942            version: "0.0.0".to_string(),
2943            entry_flows: Vec::new(),
2944            secret_requirements: Vec::new(),
2945        }
2946    }
2947
2948    pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
2949        let entry_flows = manifest
2950            .flows
2951            .iter()
2952            .map(|flow| flow.id.as_str().to_string())
2953            .collect::<Vec<_>>();
2954        Self {
2955            pack_id: manifest.pack_id.as_str().to_string(),
2956            version: manifest.version.to_string(),
2957            entry_flows,
2958            secret_requirements: manifest.secret_requirements.clone(),
2959        }
2960    }
2961}