greentic_runner_host/
pack.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::cache::{ArtifactKey, CacheConfig, CacheManager, CpuPolicy, EngineProfile};
10use crate::component_api::{
11    self, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::provider::{ProviderBinding, ProviderRegistry};
15use crate::provider_core::SchemaCorePre as ProviderComponentPre;
16use crate::provider_core_only;
17use crate::runtime_wasmtime::{Component, Engine, InstancePre, Linker, ResourceTable};
18use anyhow::{Context, Result, anyhow, bail};
19use greentic_distributor_client::dist::{DistClient, DistError, DistOptions};
20use greentic_interfaces_wasmtime::host_helpers::v1::{
21    self as host_v1, HostFns, add_all_v1_to_linker,
22    runner_host_http::RunnerHostHttp,
23    runner_host_kv::RunnerHostKv,
24    secrets_store::{SecretsError, SecretsStoreHost},
25    state_store::{
26        OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
27        StateStoreHost, TenantCtx as StateTenantCtx,
28    },
29    telemetry_logger::{
30        OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
31        TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
32        TenantCtx as TelemetryTenantCtx,
33    },
34};
35use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
36use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
37use greentic_pack::builder as legacy_pack;
38use greentic_types::flow::FlowHasher;
39use greentic_types::{
40    ArtifactLocationV1, ComponentId, ComponentManifest, ComponentSourceRef, ComponentSourcesV1,
41    EXT_COMPONENT_SOURCES_V1, EnvId, ExtensionRef, Flow, FlowComponentRef, FlowId, FlowKind,
42    FlowMetadata, InputMapping, Node, NodeId, OutputMapping, Routing, StateKey as StoreStateKey,
43    TeamId, TelemetryHints, TenantCtx as TypesTenantCtx, TenantId, UserId, decode_pack_manifest,
44    pack_manifest::ExtensionInline,
45};
46use host_v1::http_client::{
47    HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
48    Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
49    RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
50    TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
51};
52use indexmap::IndexMap;
53use once_cell::sync::Lazy;
54use parking_lot::{Mutex, RwLock};
55use reqwest::blocking::Client as BlockingClient;
56use runner_core::normalize_under_root;
57use serde::{Deserialize, Serialize};
58use serde_cbor;
59use serde_json::{self, Value};
60use sha2::Digest;
61use tokio::fs;
62use wasmparser::{Parser, Payload};
63use wasmtime::StoreContextMut;
64use zip::ZipArchive;
65
66use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
67use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
68use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
69
70use crate::config::HostConfig;
71use crate::secrets::{DynSecretsManager, read_secret_blocking};
72use crate::storage::state::STATE_PREFIX;
73use crate::storage::{DynSessionStore, DynStateStore};
74use crate::verify;
75use crate::wasi::RunnerWasiPolicy;
76use tracing::warn;
77use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
78use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
79
80use greentic_flow::model::FlowDoc;
81
82#[allow(dead_code)]
83pub struct PackRuntime {
84    /// Component artifact path (wasm file).
85    path: PathBuf,
86    /// Optional archive (.gtpack) used to load flows/manifests.
87    archive_path: Option<PathBuf>,
88    config: Arc<HostConfig>,
89    engine: Engine,
90    metadata: PackMetadata,
91    manifest: Option<greentic_types::PackManifest>,
92    legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
93    component_manifests: HashMap<String, ComponentManifest>,
94    mocks: Option<Arc<MockLayer>>,
95    flows: Option<PackFlows>,
96    components: HashMap<String, PackComponent>,
97    http_client: Arc<BlockingClient>,
98    pre_cache: Mutex<HashMap<String, InstancePre<ComponentState>>>,
99    session_store: Option<DynSessionStore>,
100    state_store: Option<DynStateStore>,
101    wasi_policy: Arc<RunnerWasiPolicy>,
102    provider_registry: RwLock<Option<ProviderRegistry>>,
103    secrets: DynSecretsManager,
104    oauth_config: Option<OAuthBrokerConfig>,
105    cache: CacheManager,
106}
107
108struct PackComponent {
109    #[allow(dead_code)]
110    name: String,
111    #[allow(dead_code)]
112    version: String,
113    component: Arc<Component>,
114}
115
116#[derive(Debug, Default, Clone)]
117pub struct ComponentResolution {
118    /// Root of a materialized pack directory containing `manifest.cbor` and `components/`.
119    pub materialized_root: Option<PathBuf>,
120    /// Explicit overrides mapping component id -> wasm path.
121    pub overrides: HashMap<String, PathBuf>,
122    /// If true, do not fetch remote components; require cached artifacts.
123    pub dist_offline: bool,
124    /// Optional cache directory for resolved remote components.
125    pub dist_cache_dir: Option<PathBuf>,
126    /// Allow bundled components without wasm_sha256 (dev-only escape hatch).
127    pub allow_missing_hash: bool,
128}
129
130fn build_blocking_client() -> BlockingClient {
131    std::thread::spawn(|| {
132        BlockingClient::builder()
133            .no_proxy()
134            .build()
135            .expect("blocking client")
136    })
137    .join()
138    .expect("client build thread panicked")
139}
140
141fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
142    let (root, candidate) = if path.is_absolute() {
143        let parent = path
144            .parent()
145            .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
146        let root = parent
147            .canonicalize()
148            .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
149        let file = path
150            .file_name()
151            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
152        (root, PathBuf::from(file))
153    } else {
154        let cwd = std::env::current_dir().context("failed to resolve current directory")?;
155        let base = if let Some(parent) = path.parent() {
156            cwd.join(parent)
157        } else {
158            cwd
159        };
160        let root = base
161            .canonicalize()
162            .with_context(|| format!("failed to canonicalize {}", base.display()))?;
163        let file = path
164            .file_name()
165            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
166        (root, PathBuf::from(file))
167    };
168    let safe = normalize_under_root(&root, &candidate)?;
169    Ok((root, safe))
170}
171
172static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
173
174#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct FlowDescriptor {
176    pub id: String,
177    #[serde(rename = "type")]
178    pub flow_type: String,
179    pub profile: String,
180    pub version: String,
181    #[serde(default)]
182    pub description: Option<String>,
183}
184
185pub struct HostState {
186    config: Arc<HostConfig>,
187    http_client: Arc<BlockingClient>,
188    default_env: String,
189    #[allow(dead_code)]
190    session_store: Option<DynSessionStore>,
191    state_store: Option<DynStateStore>,
192    mocks: Option<Arc<MockLayer>>,
193    secrets: DynSecretsManager,
194    oauth_config: Option<OAuthBrokerConfig>,
195    oauth_host: OAuthBrokerHost,
196    exec_ctx: Option<ComponentExecCtx>,
197}
198
199impl HostState {
200    #[allow(clippy::default_constructed_unit_structs)]
201    #[allow(clippy::too_many_arguments)]
202    pub fn new(
203        config: Arc<HostConfig>,
204        http_client: Arc<BlockingClient>,
205        mocks: Option<Arc<MockLayer>>,
206        session_store: Option<DynSessionStore>,
207        state_store: Option<DynStateStore>,
208        secrets: DynSecretsManager,
209        oauth_config: Option<OAuthBrokerConfig>,
210        exec_ctx: Option<ComponentExecCtx>,
211    ) -> Result<Self> {
212        let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
213        Ok(Self {
214            config,
215            http_client,
216            default_env,
217            session_store,
218            state_store,
219            mocks,
220            secrets,
221            oauth_config,
222            oauth_host: OAuthBrokerHost::default(),
223            exec_ctx,
224        })
225    }
226
227    pub fn get_secret(&self, key: &str) -> Result<String> {
228        if provider_core_only::is_enabled() {
229            bail!(provider_core_only::blocked_message("secrets"))
230        }
231        if !self.config.secrets_policy.is_allowed(key) {
232            bail!("secret {key} is not permitted by bindings policy");
233        }
234        if let Some(mock) = &self.mocks
235            && let Some(value) = mock.secrets_lookup(key)
236        {
237            return Ok(value);
238        }
239        let bytes = read_secret_blocking(&self.secrets, key)
240            .context("failed to read secret from manager")?;
241        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
242        Ok(value)
243    }
244
245    fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
246        let tenant_raw = ctx
247            .as_ref()
248            .map(|ctx| ctx.tenant.clone())
249            .or_else(|| self.exec_ctx.as_ref().map(|ctx| ctx.tenant.tenant.clone()))
250            .unwrap_or_else(|| self.config.tenant.clone());
251        let env_raw = ctx
252            .as_ref()
253            .map(|ctx| ctx.env.clone())
254            .unwrap_or_else(|| self.default_env.clone());
255        let tenant_id = TenantId::from_str(&tenant_raw)
256            .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
257        let env_id = EnvId::from_str(&env_raw)
258            .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
259        let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
260        if let Some(exec_ctx) = self.exec_ctx.as_ref() {
261            if let Some(team) = exec_ctx.tenant.team.as_ref() {
262                let team_id =
263                    TeamId::from_str(team).with_context(|| format!("invalid team id `{team}`"))?;
264                tenant_ctx = tenant_ctx.with_team(Some(team_id));
265            }
266            if let Some(user) = exec_ctx.tenant.user.as_ref() {
267                let user_id =
268                    UserId::from_str(user).with_context(|| format!("invalid user id `{user}`"))?;
269                tenant_ctx = tenant_ctx.with_user(Some(user_id));
270            }
271            tenant_ctx = tenant_ctx.with_flow(exec_ctx.flow_id.clone());
272            if let Some(node) = exec_ctx.node_id.as_ref() {
273                tenant_ctx = tenant_ctx.with_node(node.clone());
274            }
275            if let Some(session) = exec_ctx.tenant.correlation_id.as_ref() {
276                tenant_ctx = tenant_ctx.with_session(session.clone());
277            }
278            tenant_ctx.trace_id = exec_ctx.tenant.trace_id.clone();
279        }
280
281        if let Some(ctx) = ctx {
282            if let Some(team) = ctx.team.or(ctx.team_id) {
283                let team_id =
284                    TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
285                tenant_ctx = tenant_ctx.with_team(Some(team_id));
286            }
287            if let Some(user) = ctx.user.or(ctx.user_id) {
288                let user_id =
289                    UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
290                tenant_ctx = tenant_ctx.with_user(Some(user_id));
291            }
292            if let Some(flow) = ctx.flow_id {
293                tenant_ctx = tenant_ctx.with_flow(flow);
294            }
295            if let Some(node) = ctx.node_id {
296                tenant_ctx = tenant_ctx.with_node(node);
297            }
298            if let Some(provider) = ctx.provider_id {
299                tenant_ctx = tenant_ctx.with_provider(provider);
300            }
301            if let Some(session) = ctx.session_id {
302                tenant_ctx = tenant_ctx.with_session(session);
303            }
304            tenant_ctx.trace_id = ctx.trace_id;
305        }
306        Ok(tenant_ctx)
307    }
308
309    fn send_http_request(
310        &mut self,
311        req: HttpRequest,
312        opts: Option<HttpRequestOptionsV1_1>,
313        _ctx: Option<HttpTenantCtx>,
314    ) -> Result<HttpResponse, HttpClientError> {
315        if !self.config.http_enabled {
316            return Err(HttpClientError {
317                code: "denied".into(),
318                message: "http client disabled by policy".into(),
319            });
320        }
321
322        let mut mock_state = None;
323        let raw_body = req.body.clone();
324        if let Some(mock) = &self.mocks
325            && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
326        {
327            match mock.http_begin(&meta) {
328                HttpDecision::Mock(response) => {
329                    let headers = response
330                        .headers
331                        .iter()
332                        .map(|(k, v)| (k.clone(), v.clone()))
333                        .collect();
334                    return Ok(HttpResponse {
335                        status: response.status,
336                        headers,
337                        body: response.body.clone().map(|b| b.into_bytes()),
338                    });
339                }
340                HttpDecision::Deny(reason) => {
341                    return Err(HttpClientError {
342                        code: "denied".into(),
343                        message: reason,
344                    });
345                }
346                HttpDecision::Passthrough { record } => {
347                    mock_state = Some((meta, record));
348                }
349            }
350        }
351
352        let method = req.method.parse().unwrap_or(reqwest::Method::GET);
353        let mut builder = self.http_client.request(method, &req.url);
354        for (key, value) in req.headers {
355            if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
356                && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
357            {
358                builder = builder.header(header, header_value);
359            }
360        }
361
362        if let Some(body) = raw_body.clone() {
363            builder = builder.body(body);
364        }
365
366        if let Some(opts) = opts {
367            if let Some(timeout_ms) = opts.timeout_ms {
368                builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
369            }
370            if opts.allow_insecure == Some(true) {
371                warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
372            }
373            if let Some(follow_redirects) = opts.follow_redirects
374                && !follow_redirects
375            {
376                warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
377            }
378        }
379
380        let response = match builder.send() {
381            Ok(resp) => resp,
382            Err(err) => {
383                warn!(url = %req.url, error = %err, "http client request failed");
384                return Err(HttpClientError {
385                    code: "unavailable".into(),
386                    message: err.to_string(),
387                });
388            }
389        };
390
391        let status = response.status().as_u16();
392        let headers_vec = response
393            .headers()
394            .iter()
395            .map(|(k, v)| {
396                (
397                    k.as_str().to_string(),
398                    v.to_str().unwrap_or_default().to_string(),
399                )
400            })
401            .collect::<Vec<_>>();
402        let body_bytes = response.bytes().ok().map(|b| b.to_vec());
403
404        if let Some((meta, true)) = mock_state.take()
405            && let Some(mock) = &self.mocks
406        {
407            let recorded = HttpMockResponse::new(
408                status,
409                headers_vec.clone().into_iter().collect(),
410                body_bytes
411                    .as_ref()
412                    .map(|b| String::from_utf8_lossy(b).into_owned()),
413            );
414            mock.http_record(&meta, &recorded);
415        }
416
417        Ok(HttpResponse {
418            status,
419            headers: headers_vec,
420            body: body_bytes,
421        })
422    }
423}
424
425impl SecretsStoreHost for HostState {
426    fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
427        if provider_core_only::is_enabled() {
428            warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
429            return Err(SecretsError::Denied);
430        }
431        if !self.config.secrets_policy.is_allowed(&key) {
432            return Err(SecretsError::Denied);
433        }
434        if let Some(mock) = &self.mocks
435            && let Some(value) = mock.secrets_lookup(&key)
436        {
437            return Ok(Some(value.into_bytes()));
438        }
439        match read_secret_blocking(&self.secrets, &key) {
440            Ok(bytes) => Ok(Some(bytes)),
441            Err(err) => {
442                warn!(secret = %key, error = %err, "secret lookup failed");
443                Err(SecretsError::NotFound)
444            }
445        }
446    }
447}
448
449impl HttpClientHost for HostState {
450    fn send(
451        &mut self,
452        req: HttpRequest,
453        ctx: Option<HttpTenantCtx>,
454    ) -> Result<HttpResponse, HttpClientError> {
455        self.send_http_request(req, None, ctx)
456    }
457}
458
459impl HttpClientHostV1_1 for HostState {
460    fn send(
461        &mut self,
462        req: HttpRequestV1_1,
463        opts: Option<HttpRequestOptionsV1_1>,
464        ctx: Option<HttpTenantCtxV1_1>,
465    ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
466        let legacy_req = HttpRequest {
467            method: req.method,
468            url: req.url,
469            headers: req.headers,
470            body: req.body,
471        };
472        let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
473            env: ctx.env,
474            tenant: ctx.tenant,
475            tenant_id: ctx.tenant_id,
476            team: ctx.team,
477            team_id: ctx.team_id,
478            user: ctx.user,
479            user_id: ctx.user_id,
480            trace_id: ctx.trace_id,
481            correlation_id: ctx.correlation_id,
482            attributes: ctx.attributes,
483            session_id: ctx.session_id,
484            flow_id: ctx.flow_id,
485            node_id: ctx.node_id,
486            provider_id: ctx.provider_id,
487            deadline_ms: ctx.deadline_ms,
488            attempt: ctx.attempt,
489            idempotency_key: ctx.idempotency_key,
490            impersonation: ctx
491                .impersonation
492                .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
493                    actor_id,
494                    reason,
495                }),
496        });
497
498        self.send_http_request(legacy_req, opts, legacy_ctx)
499            .map(|resp| HttpResponseV1_1 {
500                status: resp.status,
501                headers: resp.headers,
502                body: resp.body,
503            })
504            .map_err(|err| HttpClientErrorV1_1 {
505                code: err.code,
506                message: err.message,
507            })
508    }
509}
510
511impl StateStoreHost for HostState {
512    fn read(
513        &mut self,
514        key: HostStateKey,
515        ctx: Option<StateTenantCtx>,
516    ) -> Result<Vec<u8>, StateError> {
517        let store = match self.state_store.as_ref() {
518            Some(store) => store.clone(),
519            None => {
520                return Err(StateError {
521                    code: "unavailable".into(),
522                    message: "state store not configured".into(),
523                });
524            }
525        };
526        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
527            Ok(ctx) => ctx,
528            Err(err) => {
529                return Err(StateError {
530                    code: "invalid-ctx".into(),
531                    message: err.to_string(),
532                });
533            }
534        };
535        let key = StoreStateKey::from(key);
536        match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
537            Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
538            Ok(None) => Err(StateError {
539                code: "not_found".into(),
540                message: "state key not found".into(),
541            }),
542            Err(err) => Err(StateError {
543                code: "internal".into(),
544                message: err.to_string(),
545            }),
546        }
547    }
548
549    fn write(
550        &mut self,
551        key: HostStateKey,
552        bytes: Vec<u8>,
553        ctx: Option<StateTenantCtx>,
554    ) -> Result<StateOpAck, StateError> {
555        let store = match self.state_store.as_ref() {
556            Some(store) => store.clone(),
557            None => {
558                return Err(StateError {
559                    code: "unavailable".into(),
560                    message: "state store not configured".into(),
561                });
562            }
563        };
564        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
565            Ok(ctx) => ctx,
566            Err(err) => {
567                return Err(StateError {
568                    code: "invalid-ctx".into(),
569                    message: err.to_string(),
570                });
571            }
572        };
573        let key = StoreStateKey::from(key);
574        let value = serde_json::from_slice(&bytes)
575            .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
576        match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
577            Ok(()) => Ok(StateOpAck::Ok),
578            Err(err) => Err(StateError {
579                code: "internal".into(),
580                message: err.to_string(),
581            }),
582        }
583    }
584
585    fn delete(
586        &mut self,
587        key: HostStateKey,
588        ctx: Option<StateTenantCtx>,
589    ) -> Result<StateOpAck, StateError> {
590        let store = match self.state_store.as_ref() {
591            Some(store) => store.clone(),
592            None => {
593                return Err(StateError {
594                    code: "unavailable".into(),
595                    message: "state store not configured".into(),
596                });
597            }
598        };
599        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
600            Ok(ctx) => ctx,
601            Err(err) => {
602                return Err(StateError {
603                    code: "invalid-ctx".into(),
604                    message: err.to_string(),
605                });
606            }
607        };
608        let key = StoreStateKey::from(key);
609        match store.del(&tenant_ctx, STATE_PREFIX, &key) {
610            Ok(_) => Ok(StateOpAck::Ok),
611            Err(err) => Err(StateError {
612                code: "internal".into(),
613                message: err.to_string(),
614            }),
615        }
616    }
617}
618
619impl TelemetryLoggerHost for HostState {
620    fn log(
621        &mut self,
622        span: TelemetrySpanContext,
623        fields: Vec<(String, String)>,
624        _ctx: Option<TelemetryTenantCtx>,
625    ) -> Result<TelemetryAck, TelemetryError> {
626        if let Some(mock) = &self.mocks
627            && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
628        {
629            return Ok(TelemetryAck::Ok);
630        }
631        let mut map = serde_json::Map::new();
632        for (k, v) in fields {
633            map.insert(k, Value::String(v));
634        }
635        tracing::info!(
636            tenant = %span.tenant,
637            flow_id = %span.flow_id,
638            node = ?span.node_id,
639            provider = %span.provider,
640            fields = %serde_json::Value::Object(map.clone()),
641            "telemetry log from pack"
642        );
643        Ok(TelemetryAck::Ok)
644    }
645}
646
647impl RunnerHostHttp for HostState {
648    fn request(
649        &mut self,
650        method: String,
651        url: String,
652        headers: Vec<String>,
653        body: Option<Vec<u8>>,
654    ) -> Result<Vec<u8>, String> {
655        let req = HttpRequest {
656            method,
657            url,
658            headers: headers
659                .chunks(2)
660                .filter_map(|chunk| {
661                    if chunk.len() == 2 {
662                        Some((chunk[0].clone(), chunk[1].clone()))
663                    } else {
664                        None
665                    }
666                })
667                .collect(),
668            body,
669        };
670        match HttpClientHost::send(self, req, None) {
671            Ok(resp) => Ok(resp.body.unwrap_or_default()),
672            Err(err) => Err(err.message),
673        }
674    }
675}
676
677impl RunnerHostKv for HostState {
678    fn get(&mut self, _ns: String, _key: String) -> Option<String> {
679        None
680    }
681
682    fn put(&mut self, _ns: String, _key: String, _val: String) {}
683}
684
685enum ManifestLoad {
686    New {
687        manifest: Box<greentic_types::PackManifest>,
688        flows: PackFlows,
689    },
690    Legacy {
691        manifest: Box<legacy_pack::PackManifest>,
692        flows: PackFlows,
693    },
694}
695
696fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
697    let mut archive = ZipArchive::new(File::open(path)?)
698        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
699    let bytes = read_entry(&mut archive, "manifest.cbor")
700        .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
701    match decode_pack_manifest(&bytes) {
702        Ok(manifest) => {
703            let cache = PackFlows::from_manifest(manifest.clone());
704            Ok(ManifestLoad::New {
705                manifest: Box::new(manifest),
706                flows: cache,
707            })
708        }
709        Err(err) => {
710            tracing::debug!(error = %err, pack = %path.display(), "decode_pack_manifest failed; trying legacy manifest");
711            // Fall back to legacy pack manifest
712            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
713                .context("failed to decode legacy pack manifest from manifest.cbor")?;
714            let flows = load_legacy_flows(&mut archive, &legacy)?;
715            Ok(ManifestLoad::Legacy {
716                manifest: Box::new(legacy),
717                flows,
718            })
719        }
720    }
721}
722
723fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
724    let manifest_path = root.join("manifest.cbor");
725    let bytes = std::fs::read(&manifest_path)
726        .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
727    match decode_pack_manifest(&bytes) {
728        Ok(manifest) => {
729            let cache = PackFlows::from_manifest(manifest.clone());
730            Ok(ManifestLoad::New {
731                manifest: Box::new(manifest),
732                flows: cache,
733            })
734        }
735        Err(err) => {
736            tracing::debug!(
737                error = %err,
738                pack = %root.display(),
739                "decode_pack_manifest failed for materialized pack; trying legacy manifest"
740            );
741            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
742                .context("failed to decode legacy pack manifest from manifest.cbor")?;
743            let flows = load_legacy_flows_from_dir(root, &legacy)?;
744            Ok(ManifestLoad::Legacy {
745                manifest: Box::new(legacy),
746                flows,
747            })
748        }
749    }
750}
751
752fn load_legacy_flows(
753    archive: &mut ZipArchive<File>,
754    manifest: &legacy_pack::PackManifest,
755) -> Result<PackFlows> {
756    let mut flows = HashMap::new();
757    let mut descriptors = Vec::new();
758
759    for entry in &manifest.flows {
760        let bytes = read_entry(archive, &entry.file_json)
761            .with_context(|| format!("missing flow json {}", entry.file_json))?;
762        let doc: FlowDoc = serde_json::from_slice(&bytes)
763            .with_context(|| format!("failed to decode flow doc {}", entry.file_json))?;
764        let normalized = normalize_flow_doc(doc);
765        let flow_ir = flow_doc_to_ir(normalized)?;
766        let flow = flow_ir_to_flow(flow_ir)?;
767
768        descriptors.push(FlowDescriptor {
769            id: entry.id.clone(),
770            flow_type: entry.kind.clone(),
771            profile: manifest.meta.pack_id.clone(),
772            version: manifest.meta.version.to_string(),
773            description: None,
774        });
775        flows.insert(entry.id.clone(), flow);
776    }
777
778    let mut entry_flows = manifest.meta.entry_flows.clone();
779    if entry_flows.is_empty() {
780        entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
781    }
782    let metadata = PackMetadata {
783        pack_id: manifest.meta.pack_id.clone(),
784        version: manifest.meta.version.to_string(),
785        entry_flows,
786        secret_requirements: Vec::new(),
787    };
788
789    Ok(PackFlows {
790        descriptors,
791        flows,
792        metadata,
793    })
794}
795
796fn load_legacy_flows_from_dir(
797    root: &Path,
798    manifest: &legacy_pack::PackManifest,
799) -> Result<PackFlows> {
800    let mut flows = HashMap::new();
801    let mut descriptors = Vec::new();
802
803    for entry in &manifest.flows {
804        let path = root.join(&entry.file_json);
805        let bytes = std::fs::read(&path)
806            .with_context(|| format!("missing flow json {}", path.display()))?;
807        let doc: FlowDoc = serde_json::from_slice(&bytes)
808            .with_context(|| format!("failed to decode flow doc {}", path.display()))?;
809        let normalized = normalize_flow_doc(doc);
810        let flow_ir = flow_doc_to_ir(normalized)?;
811        let flow = flow_ir_to_flow(flow_ir)?;
812
813        descriptors.push(FlowDescriptor {
814            id: entry.id.clone(),
815            flow_type: entry.kind.clone(),
816            profile: manifest.meta.pack_id.clone(),
817            version: manifest.meta.version.to_string(),
818            description: None,
819        });
820        flows.insert(entry.id.clone(), flow);
821    }
822
823    let mut entry_flows = manifest.meta.entry_flows.clone();
824    if entry_flows.is_empty() {
825        entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
826    }
827    let metadata = PackMetadata {
828        pack_id: manifest.meta.pack_id.clone(),
829        version: manifest.meta.version.to_string(),
830        entry_flows,
831        secret_requirements: Vec::new(),
832    };
833
834    Ok(PackFlows {
835        descriptors,
836        flows,
837        metadata,
838    })
839}
840
841pub struct ComponentState {
842    pub host: HostState,
843    wasi_ctx: WasiCtx,
844    resource_table: ResourceTable,
845}
846
847impl ComponentState {
848    pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
849        let wasi_ctx = policy
850            .instantiate()
851            .context("failed to build WASI context")?;
852        Ok(Self {
853            host,
854            wasi_ctx,
855            resource_table: ResourceTable::new(),
856        })
857    }
858
859    fn host_mut(&mut self) -> &mut HostState {
860        &mut self.host
861    }
862
863    fn should_cancel_host(&mut self) -> bool {
864        false
865    }
866
867    fn yield_now_host(&mut self) {
868        // no-op cooperative yield
869    }
870}
871
872impl component_api::v0_4::greentic::component::control::Host for ComponentState {
873    fn should_cancel(&mut self) -> bool {
874        self.should_cancel_host()
875    }
876
877    fn yield_now(&mut self) {
878        self.yield_now_host();
879    }
880}
881
882impl component_api::v0_5::greentic::component::control::Host for ComponentState {
883    fn should_cancel(&mut self) -> bool {
884        self.should_cancel_host()
885    }
886
887    fn yield_now(&mut self) {
888        self.yield_now_host();
889    }
890}
891
892fn add_component_control_instance(
893    linker: &mut Linker<ComponentState>,
894    name: &str,
895) -> wasmtime::Result<()> {
896    let mut inst = linker.instance(name)?;
897    inst.func_wrap(
898        "should-cancel",
899        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
900            let host = caller.data_mut();
901            Ok((host.should_cancel_host(),))
902        },
903    )?;
904    inst.func_wrap(
905        "yield-now",
906        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
907            let host = caller.data_mut();
908            host.yield_now_host();
909            Ok(())
910        },
911    )?;
912    Ok(())
913}
914
915fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
916    add_component_control_instance(linker, "greentic:component/control@0.5.0")?;
917    add_component_control_instance(linker, "greentic:component/control@0.4.0")?;
918    Ok(())
919}
920
921pub fn register_all(linker: &mut Linker<ComponentState>, allow_state_store: bool) -> Result<()> {
922    add_wasi_to_linker(linker)?;
923    add_all_v1_to_linker(
924        linker,
925        HostFns {
926            http_client_v1_1: Some(|state| state.host_mut()),
927            http_client: Some(|state| state.host_mut()),
928            oauth_broker: None,
929            runner_host_http: Some(|state| state.host_mut()),
930            runner_host_kv: Some(|state| state.host_mut()),
931            telemetry_logger: Some(|state| state.host_mut()),
932            state_store: allow_state_store.then_some(|state| state.host_mut()),
933            secrets_store: Some(|state| state.host_mut()),
934        },
935    )?;
936    Ok(())
937}
938
939impl OAuthHostContext for ComponentState {
940    fn tenant_id(&self) -> &str {
941        &self.host.config.tenant
942    }
943
944    fn env(&self) -> &str {
945        &self.host.default_env
946    }
947
948    fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
949        &mut self.host.oauth_host
950    }
951
952    fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
953        self.host.oauth_config.as_ref()
954    }
955}
956
957impl WasiView for ComponentState {
958    fn ctx(&mut self) -> WasiCtxView<'_> {
959        WasiCtxView {
960            ctx: &mut self.wasi_ctx,
961            table: &mut self.resource_table,
962        }
963    }
964}
965
966#[allow(unsafe_code)]
967unsafe impl Send for ComponentState {}
968#[allow(unsafe_code)]
969unsafe impl Sync for ComponentState {}
970
971impl PackRuntime {
972    fn allows_state_store(&self, component_ref: &str) -> bool {
973        if self.state_store.is_none() {
974            return false;
975        }
976        if !self.config.state_store_policy.allow {
977            return false;
978        }
979        let Some(manifest) = self.component_manifests.get(component_ref) else {
980            return false;
981        };
982        manifest
983            .capabilities
984            .host
985            .state
986            .as_ref()
987            .map(|caps| caps.read || caps.write)
988            .unwrap_or(false)
989    }
990
991    #[allow(clippy::too_many_arguments)]
992    pub async fn load(
993        path: impl AsRef<Path>,
994        config: Arc<HostConfig>,
995        mocks: Option<Arc<MockLayer>>,
996        archive_source: Option<&Path>,
997        session_store: Option<DynSessionStore>,
998        state_store: Option<DynStateStore>,
999        wasi_policy: Arc<RunnerWasiPolicy>,
1000        secrets: DynSecretsManager,
1001        oauth_config: Option<OAuthBrokerConfig>,
1002        verify_archive: bool,
1003        component_resolution: ComponentResolution,
1004    ) -> Result<Self> {
1005        let path = path.as_ref();
1006        let (_pack_root, safe_path) = normalize_pack_path(path)?;
1007        let path_meta = std::fs::metadata(&safe_path).ok();
1008        let is_dir = path_meta
1009            .as_ref()
1010            .map(|meta| meta.is_dir())
1011            .unwrap_or(false);
1012        let is_component = !is_dir
1013            && safe_path
1014                .extension()
1015                .and_then(|ext| ext.to_str())
1016                .map(|ext| ext.eq_ignore_ascii_case("wasm"))
1017                .unwrap_or(false);
1018        let archive_hint_path = if let Some(source) = archive_source {
1019            let (_, normalized) = normalize_pack_path(source)?;
1020            Some(normalized)
1021        } else if is_component || is_dir {
1022            None
1023        } else {
1024            Some(safe_path.clone())
1025        };
1026        let archive_hint = archive_hint_path.as_deref();
1027        if verify_archive {
1028            if let Some(verify_target) = archive_hint.and_then(|p| {
1029                std::fs::metadata(p)
1030                    .ok()
1031                    .filter(|meta| meta.is_file())
1032                    .map(|_| p)
1033            }) {
1034                verify::verify_pack(verify_target).await?;
1035                tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
1036            } else {
1037                tracing::debug!("skipping archive verification (no archive source)");
1038            }
1039        }
1040        let engine = Engine::default();
1041        let engine_profile =
1042            EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1043        let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1044        let mut metadata = PackMetadata::fallback(&safe_path);
1045        let mut manifest = None;
1046        let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
1047        let mut flows = None;
1048        let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
1049            if is_dir {
1050                Some(safe_path.clone())
1051            } else {
1052                None
1053            }
1054        });
1055
1056        if let Some(root) = materialized_root.as_ref() {
1057            match load_manifest_and_flows_from_dir(root) {
1058                Ok(ManifestLoad::New {
1059                    manifest: m,
1060                    flows: cache,
1061                }) => {
1062                    metadata = cache.metadata.clone();
1063                    manifest = Some(*m);
1064                    flows = Some(cache);
1065                }
1066                Ok(ManifestLoad::Legacy {
1067                    manifest: m,
1068                    flows: cache,
1069                }) => {
1070                    metadata = cache.metadata.clone();
1071                    legacy_manifest = Some(m);
1072                    flows = Some(cache);
1073                }
1074                Err(err) => {
1075                    warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
1076                }
1077            }
1078        }
1079
1080        if manifest.is_none()
1081            && legacy_manifest.is_none()
1082            && let Some(archive_path) = archive_hint
1083        {
1084            match load_manifest_and_flows(archive_path) {
1085                Ok(ManifestLoad::New {
1086                    manifest: m,
1087                    flows: cache,
1088                }) => {
1089                    metadata = cache.metadata.clone();
1090                    manifest = Some(*m);
1091                    flows = Some(cache);
1092                }
1093                Ok(ManifestLoad::Legacy {
1094                    manifest: m,
1095                    flows: cache,
1096                }) => {
1097                    metadata = cache.metadata.clone();
1098                    legacy_manifest = Some(m);
1099                    flows = Some(cache);
1100                }
1101                Err(err) => {
1102                    warn!(error = %err, pack = %archive_path.display(), "failed to parse pack manifest; skipping flows");
1103                }
1104            }
1105        }
1106        let mut pack_lock = None;
1107        for root in find_pack_lock_roots(&safe_path, is_dir, archive_hint) {
1108            pack_lock = load_pack_lock(&root)?;
1109            if pack_lock.is_some() {
1110                break;
1111            }
1112        }
1113        let component_sources_payload = if pack_lock.is_none() {
1114            if let Some(manifest) = manifest.as_ref() {
1115                manifest
1116                    .get_component_sources_v1()
1117                    .context("invalid component sources extension")?
1118            } else {
1119                None
1120            }
1121        } else {
1122            None
1123        };
1124        let component_sources = if let Some(lock) = pack_lock.as_ref() {
1125            Some(component_sources_table_from_pack_lock(
1126                lock,
1127                component_resolution.allow_missing_hash,
1128            )?)
1129        } else {
1130            component_sources_table(component_sources_payload.as_ref())?
1131        };
1132        let components = if is_component {
1133            let wasm_bytes = fs::read(&safe_path).await?;
1134            metadata = PackMetadata::from_wasm(&wasm_bytes)
1135                .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1136            let name = safe_path
1137                .file_stem()
1138                .map(|s| s.to_string_lossy().to_string())
1139                .unwrap_or_else(|| "component".to_string());
1140            let component = compile_component_with_cache(&cache, &engine, None, wasm_bytes).await?;
1141            let mut map = HashMap::new();
1142            map.insert(
1143                name.clone(),
1144                PackComponent {
1145                    name,
1146                    version: metadata.version.clone(),
1147                    component,
1148                },
1149            );
1150            map
1151        } else {
1152            let specs = component_specs(
1153                manifest.as_ref(),
1154                legacy_manifest.as_deref(),
1155                component_sources_payload.as_ref(),
1156                pack_lock.as_ref(),
1157            );
1158            if specs.is_empty() {
1159                HashMap::new()
1160            } else {
1161                let mut loaded = HashMap::new();
1162                let mut missing: HashSet<String> =
1163                    specs.iter().map(|spec| spec.id.clone()).collect();
1164                let mut searched = Vec::new();
1165
1166                if !component_resolution.overrides.is_empty() {
1167                    load_components_from_overrides(
1168                        &cache,
1169                        &engine,
1170                        &component_resolution.overrides,
1171                        &specs,
1172                        &mut missing,
1173                        &mut loaded,
1174                    )
1175                    .await?;
1176                    searched.push("override map".to_string());
1177                }
1178
1179                if let Some(component_sources) = component_sources.as_ref() {
1180                    load_components_from_sources(
1181                        &cache,
1182                        &engine,
1183                        component_sources,
1184                        &component_resolution,
1185                        &specs,
1186                        &mut missing,
1187                        &mut loaded,
1188                        materialized_root.as_deref(),
1189                        archive_hint,
1190                    )
1191                    .await?;
1192                    searched.push(format!("extension {}", EXT_COMPONENT_SOURCES_V1));
1193                }
1194
1195                if let Some(root) = materialized_root.as_ref() {
1196                    load_components_from_dir(
1197                        &cache,
1198                        &engine,
1199                        root,
1200                        &specs,
1201                        &mut missing,
1202                        &mut loaded,
1203                    )
1204                    .await?;
1205                    searched.push(format!("components dir {}", root.display()));
1206                }
1207
1208                if let Some(archive_path) = archive_hint {
1209                    load_components_from_archive(
1210                        &cache,
1211                        &engine,
1212                        archive_path,
1213                        &specs,
1214                        &mut missing,
1215                        &mut loaded,
1216                    )
1217                    .await?;
1218                    searched.push(format!("archive {}", archive_path.display()));
1219                }
1220
1221                if !missing.is_empty() {
1222                    let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
1223                    let sources = if searched.is_empty() {
1224                        "no component sources".to_string()
1225                    } else {
1226                        searched.join(", ")
1227                    };
1228                    bail!(
1229                        "components missing: {}; looked in {}",
1230                        missing_list,
1231                        sources
1232                    );
1233                }
1234
1235                loaded
1236            }
1237        };
1238        let http_client = Arc::clone(&HTTP_CLIENT);
1239        let mut component_manifests = HashMap::new();
1240        if let Some(manifest) = manifest.as_ref() {
1241            for component in &manifest.components {
1242                component_manifests.insert(component.id.as_str().to_string(), component.clone());
1243            }
1244        }
1245        Ok(Self {
1246            path: safe_path,
1247            archive_path: archive_hint.map(Path::to_path_buf),
1248            config,
1249            engine,
1250            metadata,
1251            manifest,
1252            legacy_manifest,
1253            component_manifests,
1254            mocks,
1255            flows,
1256            components,
1257            http_client,
1258            pre_cache: Mutex::new(HashMap::new()),
1259            session_store,
1260            state_store,
1261            wasi_policy,
1262            provider_registry: RwLock::new(None),
1263            secrets,
1264            oauth_config,
1265            cache,
1266        })
1267    }
1268
1269    pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
1270        if let Some(cache) = &self.flows {
1271            return Ok(cache.descriptors.clone());
1272        }
1273        if let Some(manifest) = &self.manifest {
1274            let descriptors = manifest
1275                .flows
1276                .iter()
1277                .map(|flow| FlowDescriptor {
1278                    id: flow.id.as_str().to_string(),
1279                    flow_type: flow_kind_to_str(flow.kind).to_string(),
1280                    profile: manifest.pack_id.as_str().to_string(),
1281                    version: manifest.version.to_string(),
1282                    description: None,
1283                })
1284                .collect();
1285            return Ok(descriptors);
1286        }
1287        Ok(Vec::new())
1288    }
1289
1290    #[allow(dead_code)]
1291    pub async fn run_flow(
1292        &self,
1293        flow_id: &str,
1294        input: serde_json::Value,
1295    ) -> Result<serde_json::Value> {
1296        let pack = Arc::new(
1297            PackRuntime::load(
1298                &self.path,
1299                Arc::clone(&self.config),
1300                self.mocks.clone(),
1301                self.archive_path.as_deref(),
1302                self.session_store.clone(),
1303                self.state_store.clone(),
1304                Arc::clone(&self.wasi_policy),
1305                self.secrets.clone(),
1306                self.oauth_config.clone(),
1307                false,
1308                ComponentResolution::default(),
1309            )
1310            .await?,
1311        );
1312
1313        let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1314        let retry_config = self.config.retry_config().into();
1315        let mocks = pack.mocks.as_deref();
1316        let tenant = self.config.tenant.as_str();
1317
1318        let ctx = FlowContext {
1319            tenant,
1320            flow_id,
1321            node_id: None,
1322            tool: None,
1323            action: None,
1324            session_id: None,
1325            provider_id: None,
1326            retry_config,
1327            observer: None,
1328            mocks,
1329        };
1330
1331        let execution = engine.execute(ctx, input).await?;
1332        match execution.status {
1333            FlowStatus::Completed => Ok(execution.output),
1334            FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1335                "status": "pending",
1336                "reason": wait.reason,
1337                "resume": wait.snapshot,
1338                "response": execution.output,
1339            })),
1340        }
1341    }
1342
1343    pub async fn invoke_component(
1344        &self,
1345        component_ref: &str,
1346        ctx: ComponentExecCtx,
1347        operation: &str,
1348        _config_json: Option<String>,
1349        input_json: String,
1350    ) -> Result<Value> {
1351        let pack_component = self
1352            .components
1353            .get(component_ref)
1354            .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1355
1356        let mut linker = Linker::new(&self.engine);
1357        let allow_state_store = self.allows_state_store(component_ref);
1358        register_all(&mut linker, allow_state_store)?;
1359        add_component_control_to_linker(&mut linker)?;
1360
1361        let host_state = HostState::new(
1362            Arc::clone(&self.config),
1363            Arc::clone(&self.http_client),
1364            self.mocks.clone(),
1365            self.session_store.clone(),
1366            self.state_store.clone(),
1367            Arc::clone(&self.secrets),
1368            self.oauth_config.clone(),
1369            Some(ctx.clone()),
1370        )?;
1371        let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1372        let mut store = wasmtime::Store::new(&self.engine, store_state);
1373        let pre_instance = linker.instantiate_pre(pack_component.component.as_ref())?;
1374        let result = match component_api::v0_5::ComponentPre::new(pre_instance) {
1375            Ok(pre) => {
1376                let bindings = pre.instantiate_async(&mut store).await?;
1377                let node = bindings.greentic_component_node();
1378                let ctx_v05 = component_api::exec_ctx_v0_5(&ctx);
1379                let result = node.call_invoke(&mut store, &ctx_v05, operation, &input_json)?;
1380                component_api::invoke_result_from_v0_5(result)
1381            }
1382            Err(err) => {
1383                if is_missing_node_export(&err, "0.5.0") {
1384                    let pre_instance = linker.instantiate_pre(pack_component.component.as_ref())?;
1385                    let pre: component_api::v0_4::ComponentPre<ComponentState> =
1386                        component_api::v0_4::ComponentPre::new(pre_instance)?;
1387                    let bindings = pre.instantiate_async(&mut store).await?;
1388                    let node = bindings.greentic_component_node();
1389                    let ctx_v04 = component_api::exec_ctx_v0_4(&ctx);
1390                    let result = node.call_invoke(&mut store, &ctx_v04, operation, &input_json)?;
1391                    component_api::invoke_result_from_v0_4(result)
1392                } else {
1393                    return Err(err);
1394                }
1395            }
1396        };
1397
1398        match result {
1399            InvokeResult::Ok(body) => {
1400                if body.is_empty() {
1401                    return Ok(Value::Null);
1402                }
1403                serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
1404            }
1405            InvokeResult::Err(NodeError {
1406                code,
1407                message,
1408                retryable,
1409                backoff_ms,
1410                details,
1411            }) => {
1412                let mut obj = serde_json::Map::new();
1413                obj.insert("ok".into(), Value::Bool(false));
1414                let mut error = serde_json::Map::new();
1415                error.insert("code".into(), Value::String(code));
1416                error.insert("message".into(), Value::String(message));
1417                error.insert("retryable".into(), Value::Bool(retryable));
1418                if let Some(backoff) = backoff_ms {
1419                    error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1420                }
1421                if let Some(details) = details {
1422                    error.insert(
1423                        "details".into(),
1424                        serde_json::from_str(&details).unwrap_or(Value::String(details)),
1425                    );
1426                }
1427                obj.insert("error".into(), Value::Object(error));
1428                Ok(Value::Object(obj))
1429            }
1430        }
1431    }
1432
1433    pub fn resolve_provider(
1434        &self,
1435        provider_id: Option<&str>,
1436        provider_type: Option<&str>,
1437    ) -> Result<ProviderBinding> {
1438        let registry = self.provider_registry()?;
1439        registry.resolve(provider_id, provider_type)
1440    }
1441
1442    pub async fn invoke_provider(
1443        &self,
1444        binding: &ProviderBinding,
1445        ctx: ComponentExecCtx,
1446        op: &str,
1447        input_json: Vec<u8>,
1448    ) -> Result<Value> {
1449        let component_ref = &binding.component_ref;
1450        let pack_component = self
1451            .components
1452            .get(component_ref)
1453            .with_context(|| format!("provider component '{component_ref}' not found in pack"))?;
1454
1455        let mut linker = Linker::new(&self.engine);
1456        let allow_state_store = self.allows_state_store(component_ref);
1457        register_all(&mut linker, allow_state_store)?;
1458        add_component_control_to_linker(&mut linker)?;
1459        let pre_instance = linker.instantiate_pre(pack_component.component.as_ref())?;
1460        let pre: ProviderComponentPre<ComponentState> = ProviderComponentPre::new(pre_instance)?;
1461
1462        let host_state = HostState::new(
1463            Arc::clone(&self.config),
1464            Arc::clone(&self.http_client),
1465            self.mocks.clone(),
1466            self.session_store.clone(),
1467            self.state_store.clone(),
1468            Arc::clone(&self.secrets),
1469            self.oauth_config.clone(),
1470            Some(ctx),
1471        )?;
1472        let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1473        let mut store = wasmtime::Store::new(&self.engine, store_state);
1474        let bindings: crate::provider_core::SchemaCore = pre.instantiate_async(&mut store).await?;
1475        let provider = bindings.greentic_provider_core_schema_core_api();
1476
1477        let result = provider.call_invoke(&mut store, op, &input_json)?;
1478        deserialize_json_bytes(result)
1479    }
1480
1481    fn provider_registry(&self) -> Result<ProviderRegistry> {
1482        if let Some(registry) = self.provider_registry.read().clone() {
1483            return Ok(registry);
1484        }
1485        let manifest = self
1486            .manifest
1487            .as_ref()
1488            .context("pack manifest required for provider resolution")?;
1489        let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1490        let registry = ProviderRegistry::new(
1491            manifest,
1492            self.state_store.clone(),
1493            &self.config.tenant,
1494            &env,
1495        )?;
1496        *self.provider_registry.write() = Some(registry.clone());
1497        Ok(registry)
1498    }
1499
1500    pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1501        if let Some(cache) = &self.flows {
1502            return cache
1503                .flows
1504                .get(flow_id)
1505                .cloned()
1506                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1507        }
1508        if let Some(manifest) = &self.manifest {
1509            let entry = manifest
1510                .flows
1511                .iter()
1512                .find(|f| f.id.as_str() == flow_id)
1513                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1514            return Ok(entry.flow.clone());
1515        }
1516        bail!("flow '{flow_id}' not available (pack exports disabled)")
1517    }
1518
1519    pub fn metadata(&self) -> &PackMetadata {
1520        &self.metadata
1521    }
1522
1523    pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1524        &self.metadata.secret_requirements
1525    }
1526
1527    pub fn missing_secrets(
1528        &self,
1529        tenant_ctx: &TypesTenantCtx,
1530    ) -> Vec<greentic_types::SecretRequirement> {
1531        let env = tenant_ctx.env.as_str().to_string();
1532        let tenant = tenant_ctx.tenant.as_str().to_string();
1533        let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1534        self.required_secrets()
1535            .iter()
1536            .filter(|req| {
1537                // scope must match current context if provided
1538                if let Some(scope) = &req.scope {
1539                    if scope.env != env {
1540                        return false;
1541                    }
1542                    if scope.tenant != tenant {
1543                        return false;
1544                    }
1545                    if let Some(ref team_req) = scope.team
1546                        && team.as_ref() != Some(team_req)
1547                    {
1548                        return false;
1549                    }
1550                }
1551                read_secret_blocking(&self.secrets, req.key.as_str()).is_err()
1552            })
1553            .cloned()
1554            .collect()
1555    }
1556
1557    pub fn for_component_test(
1558        components: Vec<(String, PathBuf)>,
1559        flows: HashMap<String, FlowIR>,
1560        config: Arc<HostConfig>,
1561    ) -> Result<Self> {
1562        let engine = Engine::default();
1563        let engine_profile =
1564            EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1565        let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1566        let mut component_map = HashMap::new();
1567        for (name, path) in components {
1568            if !path.exists() {
1569                bail!("component artifact missing: {}", path.display());
1570            }
1571            let wasm_bytes = std::fs::read(&path)?;
1572            let component = Arc::new(
1573                Component::from_binary(&engine, &wasm_bytes)
1574                    .with_context(|| format!("failed to compile component {}", path.display()))?,
1575            );
1576            component_map.insert(
1577                name.clone(),
1578                PackComponent {
1579                    name,
1580                    version: "0.0.0".into(),
1581                    component,
1582                },
1583            );
1584        }
1585
1586        let mut flow_map = HashMap::new();
1587        let mut descriptors = Vec::new();
1588        for (id, ir) in flows {
1589            let flow_type = ir.flow_type.clone();
1590            let flow = flow_ir_to_flow(ir)?;
1591            flow_map.insert(id.clone(), flow);
1592            descriptors.push(FlowDescriptor {
1593                id: id.clone(),
1594                flow_type,
1595                profile: "test".into(),
1596                version: "0.0.0".into(),
1597                description: None,
1598            });
1599        }
1600        let flows_cache = PackFlows {
1601            descriptors: descriptors.clone(),
1602            flows: flow_map,
1603            metadata: PackMetadata::fallback(Path::new("component-test")),
1604        };
1605
1606        Ok(Self {
1607            path: PathBuf::new(),
1608            archive_path: None,
1609            config,
1610            engine,
1611            metadata: PackMetadata::fallback(Path::new("component-test")),
1612            manifest: None,
1613            legacy_manifest: None,
1614            component_manifests: HashMap::new(),
1615            mocks: None,
1616            flows: Some(flows_cache),
1617            components: component_map,
1618            http_client: Arc::clone(&HTTP_CLIENT),
1619            pre_cache: Mutex::new(HashMap::new()),
1620            session_store: None,
1621            state_store: None,
1622            wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1623            provider_registry: RwLock::new(None),
1624            secrets: crate::secrets::default_manager(),
1625            oauth_config: None,
1626            cache,
1627        })
1628    }
1629}
1630
1631fn is_missing_node_export(err: &wasmtime::Error, version: &str) -> bool {
1632    let message = err.to_string();
1633    message.contains("no exported instance named")
1634        && message.contains(&format!("greentic:component/node@{version}"))
1635}
1636
1637struct PackFlows {
1638    descriptors: Vec<FlowDescriptor>,
1639    flows: HashMap<String, Flow>,
1640    metadata: PackMetadata,
1641}
1642
1643const RUNTIME_FLOW_EXTENSION_IDS: [&str; 3] = [
1644    "greentic.pack.runtime_flow",
1645    "greentic.pack.flow_runtime",
1646    "greentic.pack.runtime_flows",
1647];
1648
1649#[derive(Debug, Deserialize)]
1650struct RuntimeFlowBundle {
1651    flows: Vec<RuntimeFlow>,
1652}
1653
1654#[derive(Debug, Deserialize)]
1655struct RuntimeFlow {
1656    id: String,
1657    #[serde(alias = "flow_type")]
1658    kind: FlowKind,
1659    #[serde(default)]
1660    schema_version: Option<String>,
1661    #[serde(default)]
1662    start: Option<String>,
1663    #[serde(default)]
1664    entrypoints: BTreeMap<String, Value>,
1665    nodes: BTreeMap<String, RuntimeNode>,
1666    #[serde(default)]
1667    metadata: Option<FlowMetadata>,
1668}
1669
1670#[derive(Debug, Deserialize)]
1671struct RuntimeNode {
1672    #[serde(alias = "component")]
1673    component_id: String,
1674    #[serde(default, alias = "operation")]
1675    operation_name: Option<String>,
1676    #[serde(default, alias = "payload", alias = "input")]
1677    operation_payload: Value,
1678    #[serde(default)]
1679    routing: Option<Routing>,
1680    #[serde(default)]
1681    telemetry: Option<TelemetryHints>,
1682}
1683
1684fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
1685    if bytes.is_empty() {
1686        return Ok(Value::Null);
1687    }
1688    serde_json::from_slice(&bytes).or_else(|_| {
1689        String::from_utf8(bytes)
1690            .map(Value::String)
1691            .map_err(|err| anyhow!(err))
1692    })
1693}
1694
1695impl PackFlows {
1696    fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
1697        if let Some(flows) = flows_from_runtime_extension(&manifest) {
1698            return flows;
1699        }
1700        let descriptors = manifest
1701            .flows
1702            .iter()
1703            .map(|entry| FlowDescriptor {
1704                id: entry.id.as_str().to_string(),
1705                flow_type: flow_kind_to_str(entry.kind).to_string(),
1706                profile: manifest.pack_id.as_str().to_string(),
1707                version: manifest.version.to_string(),
1708                description: None,
1709            })
1710            .collect();
1711        let mut flows = HashMap::new();
1712        for entry in &manifest.flows {
1713            flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
1714        }
1715        Self {
1716            metadata: PackMetadata::from_manifest(&manifest),
1717            descriptors,
1718            flows,
1719        }
1720    }
1721}
1722
1723fn flows_from_runtime_extension(manifest: &greentic_types::PackManifest) -> Option<PackFlows> {
1724    let extensions = manifest.extensions.as_ref()?;
1725    let extension = extensions.iter().find_map(|(key, ext)| {
1726        if RUNTIME_FLOW_EXTENSION_IDS
1727            .iter()
1728            .any(|candidate| candidate == key)
1729        {
1730            Some(ext)
1731        } else {
1732            None
1733        }
1734    })?;
1735    let runtime_flows = match decode_runtime_flow_extension(extension) {
1736        Some(flows) if !flows.is_empty() => flows,
1737        _ => return None,
1738    };
1739
1740    let descriptors = runtime_flows
1741        .iter()
1742        .map(|flow| FlowDescriptor {
1743            id: flow.id.as_str().to_string(),
1744            flow_type: flow_kind_to_str(flow.kind).to_string(),
1745            profile: manifest.pack_id.as_str().to_string(),
1746            version: manifest.version.to_string(),
1747            description: None,
1748        })
1749        .collect::<Vec<_>>();
1750    let flows = runtime_flows
1751        .into_iter()
1752        .map(|flow| (flow.id.as_str().to_string(), flow))
1753        .collect();
1754
1755    Some(PackFlows {
1756        metadata: PackMetadata::from_manifest(manifest),
1757        descriptors,
1758        flows,
1759    })
1760}
1761
1762fn decode_runtime_flow_extension(extension: &ExtensionRef) -> Option<Vec<Flow>> {
1763    let value = match extension.inline.as_ref()? {
1764        ExtensionInline::Other(value) => value.clone(),
1765        _ => return None,
1766    };
1767
1768    if let Ok(bundle) = serde_json::from_value::<RuntimeFlowBundle>(value.clone()) {
1769        return Some(collect_runtime_flows(bundle.flows));
1770    }
1771
1772    if let Ok(flows) = serde_json::from_value::<Vec<RuntimeFlow>>(value.clone()) {
1773        return Some(collect_runtime_flows(flows));
1774    }
1775
1776    if let Ok(flows) = serde_json::from_value::<Vec<Flow>>(value) {
1777        return Some(flows);
1778    }
1779
1780    warn!(
1781        extension = %extension.kind,
1782        version = %extension.version,
1783        "runtime flow extension present but could not be decoded"
1784    );
1785    None
1786}
1787
1788fn collect_runtime_flows(flows: Vec<RuntimeFlow>) -> Vec<Flow> {
1789    flows
1790        .into_iter()
1791        .filter_map(|flow| match runtime_flow_to_flow(flow) {
1792            Ok(flow) => Some(flow),
1793            Err(err) => {
1794                warn!(error = %err, "failed to decode runtime flow");
1795                None
1796            }
1797        })
1798        .collect()
1799}
1800
1801fn runtime_flow_to_flow(runtime: RuntimeFlow) -> Result<Flow> {
1802    let flow_id = FlowId::from_str(&runtime.id)
1803        .with_context(|| format!("invalid flow id `{}`", runtime.id))?;
1804    let mut entrypoints = runtime.entrypoints;
1805    if entrypoints.is_empty()
1806        && let Some(start) = &runtime.start
1807    {
1808        entrypoints.insert("default".into(), Value::String(start.clone()));
1809    }
1810
1811    let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
1812    for (id, node) in runtime.nodes {
1813        let node_id = NodeId::from_str(&id).with_context(|| format!("invalid node id `{id}`"))?;
1814        let component_id = ComponentId::from_str(&node.component_id)
1815            .with_context(|| format!("invalid component id `{}`", node.component_id))?;
1816        let component = FlowComponentRef {
1817            id: component_id,
1818            pack_alias: None,
1819            operation: node.operation_name,
1820        };
1821        let routing = node.routing.unwrap_or(Routing::End);
1822        let telemetry = node.telemetry.unwrap_or_default();
1823        nodes.insert(
1824            node_id.clone(),
1825            Node {
1826                id: node_id,
1827                component,
1828                input: InputMapping {
1829                    mapping: node.operation_payload,
1830                },
1831                output: OutputMapping {
1832                    mapping: Value::Null,
1833                },
1834                routing,
1835                telemetry,
1836            },
1837        );
1838    }
1839
1840    Ok(Flow {
1841        schema_version: runtime.schema_version.unwrap_or_else(|| "1.0".to_string()),
1842        id: flow_id,
1843        kind: runtime.kind,
1844        entrypoints,
1845        nodes,
1846        metadata: runtime.metadata.unwrap_or_default(),
1847    })
1848}
1849
1850fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
1851    match kind {
1852        greentic_types::FlowKind::Messaging => "messaging",
1853        greentic_types::FlowKind::Event => "event",
1854        greentic_types::FlowKind::ComponentConfig => "component-config",
1855        greentic_types::FlowKind::Job => "job",
1856        greentic_types::FlowKind::Http => "http",
1857    }
1858}
1859
1860fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1861    let mut file = archive
1862        .by_name(name)
1863        .with_context(|| format!("entry {name} missing from archive"))?;
1864    let mut buf = Vec::new();
1865    file.read_to_end(&mut buf)?;
1866    Ok(buf)
1867}
1868
1869fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
1870    for node in doc.nodes.values_mut() {
1871        let Some((component_ref, payload)) = node
1872            .raw
1873            .iter()
1874            .next()
1875            .map(|(key, value)| (key.clone(), value.clone()))
1876        else {
1877            continue;
1878        };
1879        if component_ref.starts_with("emit.") {
1880            node.operation = Some(component_ref);
1881            node.payload = payload;
1882            node.raw.clear();
1883            continue;
1884        }
1885        let (target_component, operation, input, config) =
1886            infer_component_exec(&payload, &component_ref);
1887        let mut payload_obj = serde_json::Map::new();
1888        // component.exec is meta; ensure the payload carries the actual target component.
1889        payload_obj.insert("component".into(), Value::String(target_component));
1890        payload_obj.insert("operation".into(), Value::String(operation));
1891        payload_obj.insert("input".into(), input);
1892        if let Some(cfg) = config {
1893            payload_obj.insert("config".into(), cfg);
1894        }
1895        node.operation = Some("component.exec".to_string());
1896        node.payload = Value::Object(payload_obj);
1897        node.raw.clear();
1898    }
1899    doc
1900}
1901
1902fn infer_component_exec(
1903    payload: &Value,
1904    component_ref: &str,
1905) -> (String, String, Value, Option<Value>) {
1906    let default_op = if component_ref.starts_with("templating.") {
1907        "render"
1908    } else {
1909        "invoke"
1910    }
1911    .to_string();
1912
1913    if let Value::Object(map) = payload {
1914        let op = map
1915            .get("op")
1916            .or_else(|| map.get("operation"))
1917            .and_then(Value::as_str)
1918            .map(|s| s.to_string())
1919            .unwrap_or_else(|| default_op.clone());
1920
1921        let mut input = map.clone();
1922        let config = input.remove("config");
1923        let component = input
1924            .get("component")
1925            .or_else(|| input.get("component_ref"))
1926            .and_then(Value::as_str)
1927            .map(|s| s.to_string())
1928            .unwrap_or_else(|| component_ref.to_string());
1929        input.remove("component");
1930        input.remove("component_ref");
1931        input.remove("op");
1932        input.remove("operation");
1933        return (component, op, Value::Object(input), config);
1934    }
1935
1936    (component_ref.to_string(), default_op, payload.clone(), None)
1937}
1938
1939#[derive(Clone, Debug)]
1940struct ComponentSpec {
1941    id: String,
1942    version: String,
1943    legacy_path: Option<String>,
1944}
1945
1946#[derive(Clone, Debug)]
1947struct ComponentSourceInfo {
1948    digest: Option<String>,
1949    source: ComponentSourceRef,
1950    artifact: ComponentArtifactLocation,
1951    expected_wasm_sha256: Option<String>,
1952    skip_digest_verification: bool,
1953}
1954
1955#[derive(Clone, Debug)]
1956enum ComponentArtifactLocation {
1957    Inline { wasm_path: String },
1958    Remote,
1959}
1960
1961#[derive(Clone, Debug, Deserialize)]
1962struct PackLockV1 {
1963    schema_version: u32,
1964    components: Vec<PackLockComponent>,
1965}
1966
1967#[derive(Clone, Debug, Deserialize)]
1968struct PackLockComponent {
1969    name: String,
1970    #[serde(default, rename = "source_ref")]
1971    source_ref: Option<String>,
1972    #[serde(default, rename = "ref")]
1973    legacy_ref: Option<String>,
1974    #[serde(default)]
1975    component_id: Option<ComponentId>,
1976    #[serde(default)]
1977    bundled: Option<bool>,
1978    #[serde(default, rename = "bundled_path")]
1979    bundled_path: Option<String>,
1980    #[serde(default, rename = "path")]
1981    legacy_path: Option<String>,
1982    #[serde(default)]
1983    wasm_sha256: Option<String>,
1984    #[serde(default, rename = "sha256")]
1985    legacy_sha256: Option<String>,
1986    #[serde(default)]
1987    resolved_digest: Option<String>,
1988    #[serde(default)]
1989    digest: Option<String>,
1990}
1991
1992fn component_specs(
1993    manifest: Option<&greentic_types::PackManifest>,
1994    legacy_manifest: Option<&legacy_pack::PackManifest>,
1995    component_sources: Option<&ComponentSourcesV1>,
1996    pack_lock: Option<&PackLockV1>,
1997) -> Vec<ComponentSpec> {
1998    if let Some(manifest) = manifest {
1999        if !manifest.components.is_empty() {
2000            return manifest
2001                .components
2002                .iter()
2003                .map(|entry| ComponentSpec {
2004                    id: entry.id.as_str().to_string(),
2005                    version: entry.version.to_string(),
2006                    legacy_path: None,
2007                })
2008                .collect();
2009        }
2010        if let Some(lock) = pack_lock {
2011            let mut seen = HashSet::new();
2012            let mut specs = Vec::new();
2013            for entry in &lock.components {
2014                let id = entry
2015                    .component_id
2016                    .as_ref()
2017                    .map(|id| id.as_str())
2018                    .unwrap_or(entry.name.as_str());
2019                if seen.insert(id.to_string()) {
2020                    specs.push(ComponentSpec {
2021                        id: id.to_string(),
2022                        version: "0.0.0".to_string(),
2023                        legacy_path: None,
2024                    });
2025                }
2026            }
2027            return specs;
2028        }
2029        if let Some(sources) = component_sources {
2030            let mut seen = HashSet::new();
2031            let mut specs = Vec::new();
2032            for entry in &sources.components {
2033                let id = entry
2034                    .component_id
2035                    .as_ref()
2036                    .map(|id| id.as_str())
2037                    .unwrap_or(entry.name.as_str());
2038                if seen.insert(id.to_string()) {
2039                    specs.push(ComponentSpec {
2040                        id: id.to_string(),
2041                        version: "0.0.0".to_string(),
2042                        legacy_path: None,
2043                    });
2044                }
2045            }
2046            return specs;
2047        }
2048    }
2049    if let Some(legacy_manifest) = legacy_manifest {
2050        return legacy_manifest
2051            .components
2052            .iter()
2053            .map(|entry| ComponentSpec {
2054                id: entry.name.clone(),
2055                version: entry.version.to_string(),
2056                legacy_path: Some(entry.file_wasm.clone()),
2057            })
2058            .collect();
2059    }
2060    Vec::new()
2061}
2062
2063fn component_sources_table(
2064    sources: Option<&ComponentSourcesV1>,
2065) -> Result<Option<HashMap<String, ComponentSourceInfo>>> {
2066    let Some(sources) = sources else {
2067        return Ok(None);
2068    };
2069    let mut table = HashMap::new();
2070    for entry in &sources.components {
2071        let artifact = match &entry.artifact {
2072            ArtifactLocationV1::Inline { wasm_path, .. } => ComponentArtifactLocation::Inline {
2073                wasm_path: wasm_path.clone(),
2074            },
2075            ArtifactLocationV1::Remote => ComponentArtifactLocation::Remote,
2076        };
2077        let info = ComponentSourceInfo {
2078            digest: Some(entry.resolved.digest.clone()),
2079            source: entry.source.clone(),
2080            artifact,
2081            expected_wasm_sha256: None,
2082            skip_digest_verification: false,
2083        };
2084        if let Some(component_id) = entry.component_id.as_ref() {
2085            table.insert(component_id.as_str().to_string(), info.clone());
2086        }
2087        table.insert(entry.name.clone(), info);
2088    }
2089    Ok(Some(table))
2090}
2091
2092fn load_pack_lock(path: &Path) -> Result<Option<PackLockV1>> {
2093    let lock_path = if path.is_dir() {
2094        let candidate = path.join("pack.lock");
2095        if candidate.exists() {
2096            Some(candidate)
2097        } else {
2098            let candidate = path.join("pack.lock.json");
2099            candidate.exists().then_some(candidate)
2100        }
2101    } else {
2102        None
2103    };
2104    let Some(lock_path) = lock_path else {
2105        return Ok(None);
2106    };
2107    let raw = std::fs::read_to_string(&lock_path)
2108        .with_context(|| format!("failed to read {}", lock_path.display()))?;
2109    let lock: PackLockV1 = serde_json::from_str(&raw).context("failed to parse pack.lock")?;
2110    if lock.schema_version != 1 {
2111        bail!("pack.lock schema_version must be 1");
2112    }
2113    Ok(Some(lock))
2114}
2115
2116fn find_pack_lock_roots(
2117    pack_path: &Path,
2118    is_dir: bool,
2119    archive_hint: Option<&Path>,
2120) -> Vec<PathBuf> {
2121    if is_dir {
2122        return vec![pack_path.to_path_buf()];
2123    }
2124    let mut roots = Vec::new();
2125    if let Some(archive_path) = archive_hint {
2126        if let Some(parent) = archive_path.parent() {
2127            roots.push(parent.to_path_buf());
2128            if let Some(grandparent) = parent.parent() {
2129                roots.push(grandparent.to_path_buf());
2130            }
2131        }
2132    } else if let Some(parent) = pack_path.parent() {
2133        roots.push(parent.to_path_buf());
2134        if let Some(grandparent) = parent.parent() {
2135            roots.push(grandparent.to_path_buf());
2136        }
2137    }
2138    roots
2139}
2140
2141fn normalize_sha256(digest: &str) -> Result<String> {
2142    let trimmed = digest.trim();
2143    if trimmed.is_empty() {
2144        bail!("sha256 digest cannot be empty");
2145    }
2146    if let Some(stripped) = trimmed.strip_prefix("sha256:") {
2147        if stripped.is_empty() {
2148            bail!("sha256 digest must include hex bytes after sha256:");
2149        }
2150        return Ok(trimmed.to_string());
2151    }
2152    if trimmed.chars().all(|c| c.is_ascii_hexdigit()) {
2153        return Ok(format!("sha256:{trimmed}"));
2154    }
2155    bail!("sha256 digest must be hex or sha256:<hex>");
2156}
2157
2158fn component_sources_table_from_pack_lock(
2159    lock: &PackLockV1,
2160    allow_missing_hash: bool,
2161) -> Result<HashMap<String, ComponentSourceInfo>> {
2162    let mut table = HashMap::new();
2163    let mut names = HashSet::new();
2164    for entry in &lock.components {
2165        if !names.insert(entry.name.clone()) {
2166            bail!(
2167                "pack.lock contains duplicate component name `{}`",
2168                entry.name
2169            );
2170        }
2171        let source_ref = match (&entry.source_ref, &entry.legacy_ref) {
2172            (Some(primary), Some(legacy)) => {
2173                if primary != legacy {
2174                    bail!(
2175                        "pack.lock component {} has conflicting refs: {} vs {}",
2176                        entry.name,
2177                        primary,
2178                        legacy
2179                    );
2180                }
2181                primary.as_str()
2182            }
2183            (Some(primary), None) => primary.as_str(),
2184            (None, Some(legacy)) => legacy.as_str(),
2185            (None, None) => {
2186                bail!("pack.lock component {} missing source_ref", entry.name);
2187            }
2188        };
2189        let source: ComponentSourceRef = source_ref
2190            .parse()
2191            .with_context(|| format!("invalid component ref `{}`", source_ref))?;
2192        let bundled_path = match (&entry.bundled_path, &entry.legacy_path) {
2193            (Some(primary), Some(legacy)) => {
2194                if primary != legacy {
2195                    bail!(
2196                        "pack.lock component {} has conflicting bundled paths: {} vs {}",
2197                        entry.name,
2198                        primary,
2199                        legacy
2200                    );
2201                }
2202                Some(primary.clone())
2203            }
2204            (Some(primary), None) => Some(primary.clone()),
2205            (None, Some(legacy)) => Some(legacy.clone()),
2206            (None, None) => None,
2207        };
2208        let bundled = entry.bundled.unwrap_or(false) || bundled_path.is_some();
2209        let (artifact, digest, expected_wasm_sha256, skip_digest_verification) = if bundled {
2210            let wasm_path = bundled_path.ok_or_else(|| {
2211                anyhow!(
2212                    "pack.lock component {} marked bundled but bundled_path is missing",
2213                    entry.name
2214                )
2215            })?;
2216            let expected_raw = match (&entry.wasm_sha256, &entry.legacy_sha256) {
2217                (Some(primary), Some(legacy)) => {
2218                    if primary != legacy {
2219                        bail!(
2220                            "pack.lock component {} has conflicting wasm_sha256 values: {} vs {}",
2221                            entry.name,
2222                            primary,
2223                            legacy
2224                        );
2225                    }
2226                    Some(primary.as_str())
2227                }
2228                (Some(primary), None) => Some(primary.as_str()),
2229                (None, Some(legacy)) => Some(legacy.as_str()),
2230                (None, None) => None,
2231            };
2232            let expected = match expected_raw {
2233                Some(value) => Some(normalize_sha256(value)?),
2234                None => None,
2235            };
2236            if expected.is_none() && !allow_missing_hash {
2237                bail!(
2238                    "pack.lock component {} missing wasm_sha256 for bundled component",
2239                    entry.name
2240                );
2241            }
2242            (
2243                ComponentArtifactLocation::Inline { wasm_path },
2244                expected.clone(),
2245                expected,
2246                allow_missing_hash && expected_raw.is_none(),
2247            )
2248        } else {
2249            if source.is_tag() {
2250                bail!(
2251                    "component {} uses tag ref {} but is not bundled; rebuild the pack",
2252                    entry.name,
2253                    source
2254                );
2255            }
2256            let expected = entry
2257                .resolved_digest
2258                .as_deref()
2259                .or(entry.digest.as_deref())
2260                .ok_or_else(|| {
2261                    anyhow!(
2262                        "pack.lock component {} missing resolved_digest for remote component",
2263                        entry.name
2264                    )
2265                })?;
2266            (
2267                ComponentArtifactLocation::Remote,
2268                Some(normalize_digest(expected)),
2269                None,
2270                false,
2271            )
2272        };
2273        let info = ComponentSourceInfo {
2274            digest,
2275            source,
2276            artifact,
2277            expected_wasm_sha256,
2278            skip_digest_verification,
2279        };
2280        if let Some(component_id) = entry.component_id.as_ref() {
2281            let key = component_id.as_str().to_string();
2282            if table.contains_key(&key) {
2283                bail!(
2284                    "pack.lock contains duplicate component id `{}`",
2285                    component_id.as_str()
2286                );
2287            }
2288            table.insert(key, info.clone());
2289        }
2290        if entry.name
2291            != entry
2292                .component_id
2293                .as_ref()
2294                .map(|id| id.as_str())
2295                .unwrap_or("")
2296        {
2297            table.insert(entry.name.clone(), info);
2298        }
2299    }
2300    Ok(table)
2301}
2302
2303fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
2304    if let Some(path) = &spec.legacy_path {
2305        return root.join(path);
2306    }
2307    root.join("components").join(format!("{}.wasm", spec.id))
2308}
2309
2310fn normalize_digest(digest: &str) -> String {
2311    if digest.starts_with("sha256:") || digest.starts_with("blake3:") {
2312        digest.to_string()
2313    } else {
2314        format!("sha256:{digest}")
2315    }
2316}
2317
2318fn compute_digest_for(bytes: &[u8], digest: &str) -> Result<String> {
2319    if digest.starts_with("blake3:") {
2320        let hash = blake3::hash(bytes);
2321        return Ok(format!("blake3:{}", hash.to_hex()));
2322    }
2323    let mut hasher = sha2::Sha256::new();
2324    hasher.update(bytes);
2325    Ok(format!("sha256:{:x}", hasher.finalize()))
2326}
2327
2328fn compute_sha256_digest_for(bytes: &[u8]) -> String {
2329    let mut hasher = sha2::Sha256::new();
2330    hasher.update(bytes);
2331    format!("sha256:{:x}", hasher.finalize())
2332}
2333
2334fn build_artifact_key(cache: &CacheManager, digest: Option<&str>, bytes: &[u8]) -> ArtifactKey {
2335    let wasm_digest = digest
2336        .map(normalize_digest)
2337        .unwrap_or_else(|| compute_sha256_digest_for(bytes));
2338    ArtifactKey::new(cache.engine_profile_id().to_string(), wasm_digest)
2339}
2340
2341async fn compile_component_with_cache(
2342    cache: &CacheManager,
2343    engine: &Engine,
2344    digest: Option<&str>,
2345    bytes: Vec<u8>,
2346) -> Result<Arc<Component>> {
2347    let key = build_artifact_key(cache, digest, &bytes);
2348    cache.get_component(engine, &key, || Ok(bytes)).await
2349}
2350
2351fn verify_component_digest(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2352    let normalized_expected = normalize_digest(expected);
2353    let actual = compute_digest_for(bytes, &normalized_expected)?;
2354    if normalize_digest(&actual) != normalized_expected {
2355        bail!(
2356            "component {component_id} digest mismatch: expected {normalized_expected}, got {actual}"
2357        );
2358    }
2359    Ok(())
2360}
2361
2362fn verify_wasm_sha256(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2363    let normalized_expected = normalize_sha256(expected)?;
2364    let actual = compute_sha256_digest_for(bytes);
2365    if actual != normalized_expected {
2366        bail!(
2367            "component {component_id} bundled digest mismatch: expected {normalized_expected}, got {actual}"
2368        );
2369    }
2370    Ok(())
2371}
2372
2373#[cfg(test)]
2374mod pack_lock_tests {
2375    use super::*;
2376    use tempfile::TempDir;
2377
2378    #[test]
2379    fn pack_lock_tag_ref_requires_bundle() {
2380        let lock = PackLockV1 {
2381            schema_version: 1,
2382            components: vec![PackLockComponent {
2383                name: "templates".to_string(),
2384                source_ref: Some("oci://registry.test/templates:latest".to_string()),
2385                legacy_ref: None,
2386                component_id: None,
2387                bundled: Some(false),
2388                bundled_path: None,
2389                legacy_path: None,
2390                wasm_sha256: None,
2391                legacy_sha256: None,
2392                resolved_digest: None,
2393                digest: None,
2394            }],
2395        };
2396        let err = component_sources_table_from_pack_lock(&lock, false).unwrap_err();
2397        assert!(
2398            err.to_string().contains("tag ref") && err.to_string().contains("rebuild the pack"),
2399            "unexpected error: {err}"
2400        );
2401    }
2402
2403    #[test]
2404    fn bundled_hash_mismatch_errors() {
2405        let rt = tokio::runtime::Runtime::new().expect("runtime");
2406        let temp = TempDir::new().expect("temp dir");
2407        let engine = Engine::default();
2408        let engine_profile =
2409            EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
2410        let cache_config = CacheConfig {
2411            root: temp.path().join("cache"),
2412            ..CacheConfig::default()
2413        };
2414        let cache = CacheManager::new(cache_config, engine_profile);
2415        let wasm_path = temp.path().join("component.wasm");
2416        let fixture_wasm = Path::new(env!("CARGO_MANIFEST_DIR"))
2417            .join("../../tests/fixtures/packs/secrets_store_smoke/components/echo_secret.wasm");
2418        let bytes = std::fs::read(&fixture_wasm).expect("read fixture wasm");
2419        std::fs::write(&wasm_path, &bytes).expect("write temp wasm");
2420
2421        let spec = ComponentSpec {
2422            id: "qa.process".to_string(),
2423            version: "0.0.0".to_string(),
2424            legacy_path: None,
2425        };
2426        let mut missing = HashSet::new();
2427        missing.insert(spec.id.clone());
2428
2429        let mut sources = HashMap::new();
2430        sources.insert(
2431            spec.id.clone(),
2432            ComponentSourceInfo {
2433                digest: Some("sha256:deadbeef".to_string()),
2434                source: ComponentSourceRef::Oci("registry.test/qa.process@sha256:deadbeef".into()),
2435                artifact: ComponentArtifactLocation::Inline {
2436                    wasm_path: "component.wasm".to_string(),
2437                },
2438                expected_wasm_sha256: Some("sha256:deadbeef".to_string()),
2439                skip_digest_verification: false,
2440            },
2441        );
2442
2443        let mut loaded = HashMap::new();
2444        let result = rt.block_on(load_components_from_sources(
2445            &cache,
2446            &engine,
2447            &sources,
2448            &ComponentResolution::default(),
2449            &[spec],
2450            &mut missing,
2451            &mut loaded,
2452            Some(temp.path()),
2453            None,
2454        ));
2455        let err = result.unwrap_err();
2456        assert!(
2457            err.to_string().contains("bundled digest mismatch"),
2458            "unexpected error: {err}"
2459        );
2460    }
2461}
2462
2463fn dist_options_from(component_resolution: &ComponentResolution) -> DistOptions {
2464    let mut opts = DistOptions {
2465        allow_tags: true,
2466        ..DistOptions::default()
2467    };
2468    if let Some(cache_dir) = component_resolution.dist_cache_dir.clone() {
2469        opts.cache_dir = cache_dir;
2470    }
2471    if component_resolution.dist_offline {
2472        opts.offline = true;
2473    }
2474    opts
2475}
2476
2477#[allow(clippy::too_many_arguments)]
2478async fn load_components_from_sources(
2479    cache: &CacheManager,
2480    engine: &Engine,
2481    component_sources: &HashMap<String, ComponentSourceInfo>,
2482    component_resolution: &ComponentResolution,
2483    specs: &[ComponentSpec],
2484    missing: &mut HashSet<String>,
2485    into: &mut HashMap<String, PackComponent>,
2486    materialized_root: Option<&Path>,
2487    archive_hint: Option<&Path>,
2488) -> Result<()> {
2489    let mut archive = if let Some(path) = archive_hint {
2490        Some(
2491            ZipArchive::new(File::open(path)?)
2492                .with_context(|| format!("{} is not a valid gtpack", path.display()))?,
2493        )
2494    } else {
2495        None
2496    };
2497    let mut dist_client: Option<DistClient> = None;
2498
2499    for spec in specs {
2500        if !missing.contains(&spec.id) {
2501            continue;
2502        }
2503        let Some(source) = component_sources.get(&spec.id) else {
2504            continue;
2505        };
2506
2507        let bytes = match &source.artifact {
2508            ComponentArtifactLocation::Inline { wasm_path } => {
2509                if let Some(root) = materialized_root {
2510                    let path = root.join(wasm_path);
2511                    if path.exists() {
2512                        std::fs::read(&path).with_context(|| {
2513                            format!(
2514                                "failed to read inline component {} from {}",
2515                                spec.id,
2516                                path.display()
2517                            )
2518                        })?
2519                    } else if archive.is_none() {
2520                        bail!("inline component {} missing at {}", spec.id, path.display());
2521                    } else {
2522                        read_entry(
2523                            archive.as_mut().expect("archive present when needed"),
2524                            wasm_path,
2525                        )
2526                        .with_context(|| {
2527                            format!(
2528                                "inline component {} missing at {} in pack archive",
2529                                spec.id, wasm_path
2530                            )
2531                        })?
2532                    }
2533                } else if let Some(archive) = archive.as_mut() {
2534                    read_entry(archive, wasm_path).with_context(|| {
2535                        format!(
2536                            "inline component {} missing at {} in pack archive",
2537                            spec.id, wasm_path
2538                        )
2539                    })?
2540                } else {
2541                    bail!(
2542                        "inline component {} missing and no pack source available",
2543                        spec.id
2544                    );
2545                }
2546            }
2547            ComponentArtifactLocation::Remote => {
2548                if source.source.is_tag() {
2549                    bail!(
2550                        "component {} uses tag ref {} but is not bundled; rebuild the pack",
2551                        spec.id,
2552                        source.source
2553                    );
2554                }
2555                let client = dist_client.get_or_insert_with(|| {
2556                    DistClient::new(dist_options_from(component_resolution))
2557                });
2558                let reference = source.source.to_string();
2559                let digest = source.digest.as_deref().ok_or_else(|| {
2560                    anyhow!(
2561                        "component {} missing expected digest for remote component",
2562                        spec.id
2563                    )
2564                })?;
2565                let cache_path = if component_resolution.dist_offline {
2566                    client
2567                        .fetch_digest(digest)
2568                        .await
2569                        .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?
2570                } else {
2571                    let resolved = client
2572                        .resolve_ref(&reference)
2573                        .await
2574                        .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
2575                    let expected = normalize_digest(digest);
2576                    let actual = normalize_digest(&resolved.digest);
2577                    if expected != actual {
2578                        bail!(
2579                            "component {} digest mismatch after fetch: expected {}, got {}",
2580                            spec.id,
2581                            expected,
2582                            actual
2583                        );
2584                    }
2585                    resolved.cache_path.ok_or_else(|| {
2586                        anyhow!(
2587                            "component {} resolved from {} but cache path is missing",
2588                            spec.id,
2589                            reference
2590                        )
2591                    })?
2592                };
2593                std::fs::read(&cache_path).with_context(|| {
2594                    format!(
2595                        "failed to read cached component {} from {}",
2596                        spec.id,
2597                        cache_path.display()
2598                    )
2599                })?
2600            }
2601        };
2602
2603        if let Some(expected) = source.expected_wasm_sha256.as_deref() {
2604            verify_wasm_sha256(&spec.id, expected, &bytes)?;
2605        } else if source.skip_digest_verification {
2606            let actual = compute_sha256_digest_for(&bytes);
2607            warn!(
2608                component_id = %spec.id,
2609                digest = %actual,
2610                "bundled component missing wasm_sha256; allowing due to flag"
2611            );
2612        } else {
2613            let expected = source.digest.as_deref().ok_or_else(|| {
2614                anyhow!(
2615                    "component {} missing expected digest for verification",
2616                    spec.id
2617                )
2618            })?;
2619            verify_component_digest(&spec.id, expected, &bytes)?;
2620        }
2621        let component =
2622            compile_component_with_cache(cache, engine, source.digest.as_deref(), bytes)
2623                .await
2624                .with_context(|| format!("failed to compile component {}", spec.id))?;
2625        into.insert(
2626            spec.id.clone(),
2627            PackComponent {
2628                name: spec.id.clone(),
2629                version: spec.version.clone(),
2630                component,
2631            },
2632        );
2633        missing.remove(&spec.id);
2634    }
2635
2636    Ok(())
2637}
2638
2639fn dist_error_for_component(err: DistError, component_id: &str, reference: &str) -> anyhow::Error {
2640    match err {
2641        DistError::CacheMiss { reference: missing } => anyhow!(
2642            "remote component {} is not cached for {}. Run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2643            component_id,
2644            missing,
2645            reference
2646        ),
2647        DistError::Offline { reference: blocked } => anyhow!(
2648            "offline mode blocked fetching component {} from {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2649            component_id,
2650            blocked,
2651            reference
2652        ),
2653        DistError::AuthRequired { target } => anyhow!(
2654            "component {} requires authenticated source {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2655            component_id,
2656            target,
2657            reference
2658        ),
2659        other => anyhow!(
2660            "failed to resolve component {} from {}: {}",
2661            component_id,
2662            reference,
2663            other
2664        ),
2665    }
2666}
2667
2668async fn load_components_from_overrides(
2669    cache: &CacheManager,
2670    engine: &Engine,
2671    overrides: &HashMap<String, PathBuf>,
2672    specs: &[ComponentSpec],
2673    missing: &mut HashSet<String>,
2674    into: &mut HashMap<String, PackComponent>,
2675) -> Result<()> {
2676    for spec in specs {
2677        if !missing.contains(&spec.id) {
2678            continue;
2679        }
2680        let Some(path) = overrides.get(&spec.id) else {
2681            continue;
2682        };
2683        let bytes = std::fs::read(path)
2684            .with_context(|| format!("failed to read override component {}", path.display()))?;
2685        let component = compile_component_with_cache(cache, engine, None, bytes)
2686            .await
2687            .with_context(|| {
2688                format!(
2689                    "failed to compile component {} from override {}",
2690                    spec.id,
2691                    path.display()
2692                )
2693            })?;
2694        into.insert(
2695            spec.id.clone(),
2696            PackComponent {
2697                name: spec.id.clone(),
2698                version: spec.version.clone(),
2699                component,
2700            },
2701        );
2702        missing.remove(&spec.id);
2703    }
2704    Ok(())
2705}
2706
2707async fn load_components_from_dir(
2708    cache: &CacheManager,
2709    engine: &Engine,
2710    root: &Path,
2711    specs: &[ComponentSpec],
2712    missing: &mut HashSet<String>,
2713    into: &mut HashMap<String, PackComponent>,
2714) -> Result<()> {
2715    for spec in specs {
2716        if !missing.contains(&spec.id) {
2717            continue;
2718        }
2719        let path = component_path_for_spec(root, spec);
2720        if !path.exists() {
2721            tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
2722            continue;
2723        }
2724        let bytes = std::fs::read(&path)
2725            .with_context(|| format!("failed to read component {}", path.display()))?;
2726        let component = compile_component_with_cache(cache, engine, None, bytes)
2727            .await
2728            .with_context(|| {
2729                format!(
2730                    "failed to compile component {} from {}",
2731                    spec.id,
2732                    path.display()
2733                )
2734            })?;
2735        into.insert(
2736            spec.id.clone(),
2737            PackComponent {
2738                name: spec.id.clone(),
2739                version: spec.version.clone(),
2740                component,
2741            },
2742        );
2743        missing.remove(&spec.id);
2744    }
2745    Ok(())
2746}
2747
2748async fn load_components_from_archive(
2749    cache: &CacheManager,
2750    engine: &Engine,
2751    path: &Path,
2752    specs: &[ComponentSpec],
2753    missing: &mut HashSet<String>,
2754    into: &mut HashMap<String, PackComponent>,
2755) -> Result<()> {
2756    let mut archive = ZipArchive::new(File::open(path)?)
2757        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
2758    for spec in specs {
2759        if !missing.contains(&spec.id) {
2760            continue;
2761        }
2762        let file_name = spec
2763            .legacy_path
2764            .clone()
2765            .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
2766        let bytes = match read_entry(&mut archive, &file_name) {
2767            Ok(bytes) => bytes,
2768            Err(err) => {
2769                warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
2770                continue;
2771            }
2772        };
2773        let component = compile_component_with_cache(cache, engine, None, bytes)
2774            .await
2775            .with_context(|| format!("failed to compile component {}", spec.id))?;
2776        into.insert(
2777            spec.id.clone(),
2778            PackComponent {
2779                name: spec.id.clone(),
2780                version: spec.version.clone(),
2781                component,
2782            },
2783        );
2784        missing.remove(&spec.id);
2785    }
2786    Ok(())
2787}
2788
2789#[cfg(test)]
2790mod tests {
2791    use super::*;
2792    use greentic_flow::model::{FlowDoc, NodeDoc};
2793    use indexmap::IndexMap;
2794    use serde_json::json;
2795
2796    #[test]
2797    fn normalizes_raw_component_to_component_exec() {
2798        let mut nodes = IndexMap::new();
2799        let mut raw = IndexMap::new();
2800        raw.insert(
2801            "templating.handlebars".into(),
2802            json!({ "template": "Hi {{name}}" }),
2803        );
2804        nodes.insert(
2805            "start".into(),
2806            NodeDoc {
2807                raw,
2808                routing: json!([{"out": true}]),
2809                ..Default::default()
2810            },
2811        );
2812        let doc = FlowDoc {
2813            id: "welcome".into(),
2814            title: None,
2815            description: None,
2816            flow_type: "messaging".into(),
2817            start: Some("start".into()),
2818            parameters: json!({}),
2819            tags: Vec::new(),
2820            schema_version: None,
2821            entrypoints: IndexMap::new(),
2822            nodes,
2823        };
2824
2825        let normalized = normalize_flow_doc(doc);
2826        let node = normalized.nodes.get("start").expect("node exists");
2827        assert_eq!(node.operation.as_deref(), Some("component.exec"));
2828        assert!(node.raw.is_empty());
2829        let payload = node.payload.as_object().expect("payload object");
2830        assert_eq!(
2831            payload.get("component"),
2832            Some(&Value::String("templating.handlebars".into()))
2833        );
2834        assert_eq!(
2835            payload.get("operation"),
2836            Some(&Value::String("render".into()))
2837        );
2838        let input = payload.get("input").unwrap();
2839        assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
2840    }
2841}
2842
2843#[derive(Clone, Debug, Default, Serialize, Deserialize)]
2844pub struct PackMetadata {
2845    pub pack_id: String,
2846    pub version: String,
2847    #[serde(default)]
2848    pub entry_flows: Vec<String>,
2849    #[serde(default)]
2850    pub secret_requirements: Vec<greentic_types::SecretRequirement>,
2851}
2852
2853impl PackMetadata {
2854    fn from_wasm(bytes: &[u8]) -> Option<Self> {
2855        let parser = Parser::new(0);
2856        for payload in parser.parse_all(bytes) {
2857            let payload = payload.ok()?;
2858            match payload {
2859                Payload::CustomSection(section) => {
2860                    if section.name() == "greentic.manifest"
2861                        && let Ok(meta) = Self::from_bytes(section.data())
2862                    {
2863                        return Some(meta);
2864                    }
2865                }
2866                Payload::DataSection(reader) => {
2867                    for segment in reader.into_iter().flatten() {
2868                        if let Ok(meta) = Self::from_bytes(segment.data) {
2869                            return Some(meta);
2870                        }
2871                    }
2872                }
2873                _ => {}
2874            }
2875        }
2876        None
2877    }
2878
2879    fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
2880        #[derive(Deserialize)]
2881        struct RawManifest {
2882            pack_id: String,
2883            version: String,
2884            #[serde(default)]
2885            entry_flows: Vec<String>,
2886            #[serde(default)]
2887            flows: Vec<RawFlow>,
2888            #[serde(default)]
2889            secret_requirements: Vec<greentic_types::SecretRequirement>,
2890        }
2891
2892        #[derive(Deserialize)]
2893        struct RawFlow {
2894            id: String,
2895        }
2896
2897        let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
2898        let mut entry_flows = if manifest.entry_flows.is_empty() {
2899            manifest.flows.iter().map(|f| f.id.clone()).collect()
2900        } else {
2901            manifest.entry_flows.clone()
2902        };
2903        entry_flows.retain(|id| !id.is_empty());
2904        Ok(Self {
2905            pack_id: manifest.pack_id,
2906            version: manifest.version,
2907            entry_flows,
2908            secret_requirements: manifest.secret_requirements,
2909        })
2910    }
2911
2912    pub fn fallback(path: &Path) -> Self {
2913        let pack_id = path
2914            .file_stem()
2915            .map(|s| s.to_string_lossy().into_owned())
2916            .unwrap_or_else(|| "unknown-pack".to_string());
2917        Self {
2918            pack_id,
2919            version: "0.0.0".to_string(),
2920            entry_flows: Vec::new(),
2921            secret_requirements: Vec::new(),
2922        }
2923    }
2924
2925    pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
2926        let entry_flows = manifest
2927            .flows
2928            .iter()
2929            .map(|flow| flow.id.as_str().to_string())
2930            .collect::<Vec<_>>();
2931        Self {
2932            pack_id: manifest.pack_id.as_str().to_string(),
2933            version: manifest.version.to_string(),
2934            entry_flows,
2935            secret_requirements: manifest.secret_requirements.clone(),
2936        }
2937    }
2938}