greentic_runner_host/
pack.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::cache::{ArtifactKey, CacheConfig, CacheManager, CpuPolicy, EngineProfile};
10use crate::component_api::{
11    self, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::provider::{ProviderBinding, ProviderRegistry};
15use crate::provider_core::SchemaCorePre as ProviderComponentPre;
16use crate::provider_core_only;
17use crate::runtime_wasmtime::{Component, Engine, InstancePre, Linker, ResourceTable};
18use anyhow::{Context, Result, anyhow, bail};
19use greentic_distributor_client::dist::{DistClient, DistError, DistOptions};
20use greentic_interfaces_wasmtime::host_helpers::v1::{
21    self as host_v1, HostFns, add_all_v1_to_linker,
22    runner_host_http::RunnerHostHttp,
23    runner_host_kv::RunnerHostKv,
24    secrets_store::{SecretsError, SecretsStoreHost},
25    state_store::{
26        OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
27        StateStoreHost, TenantCtx as StateTenantCtx,
28    },
29    telemetry_logger::{
30        OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
31        TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
32        TenantCtx as TelemetryTenantCtx,
33    },
34};
35use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
36use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
37use greentic_pack::builder as legacy_pack;
38use greentic_types::flow::FlowHasher;
39use greentic_types::{
40    ArtifactLocationV1, ComponentId, ComponentManifest, ComponentSourceRef, ComponentSourcesV1,
41    EXT_COMPONENT_SOURCES_V1, EnvId, ExtensionRef, Flow, FlowComponentRef, FlowId, FlowKind,
42    FlowMetadata, InputMapping, Node, NodeId, OutputMapping, Routing, StateKey as StoreStateKey,
43    TeamId, TelemetryHints, TenantCtx as TypesTenantCtx, TenantId, UserId, decode_pack_manifest,
44    pack_manifest::ExtensionInline,
45};
46use host_v1::http_client::{
47    HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
48    Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
49    RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
50    TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
51};
52use indexmap::IndexMap;
53use once_cell::sync::Lazy;
54use parking_lot::{Mutex, RwLock};
55use reqwest::blocking::Client as BlockingClient;
56use runner_core::normalize_under_root;
57use serde::{Deserialize, Serialize};
58use serde_cbor;
59use serde_json::{self, Value};
60use sha2::Digest;
61use tokio::fs;
62use wasmparser::{Parser, Payload};
63use wasmtime::StoreContextMut;
64use zip::ZipArchive;
65
66use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
67use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
68use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
69
70use crate::config::HostConfig;
71use crate::secrets::{DynSecretsManager, read_secret_blocking};
72use crate::storage::state::STATE_PREFIX;
73use crate::storage::{DynSessionStore, DynStateStore};
74use crate::verify;
75use crate::wasi::RunnerWasiPolicy;
76use tracing::warn;
77use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
78use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
79
80use greentic_flow::model::FlowDoc;
81
82#[allow(dead_code)]
83pub struct PackRuntime {
84    /// Component artifact path (wasm file).
85    path: PathBuf,
86    /// Optional archive (.gtpack) used to load flows/manifests.
87    archive_path: Option<PathBuf>,
88    config: Arc<HostConfig>,
89    engine: Engine,
90    metadata: PackMetadata,
91    manifest: Option<greentic_types::PackManifest>,
92    legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
93    component_manifests: HashMap<String, ComponentManifest>,
94    mocks: Option<Arc<MockLayer>>,
95    flows: Option<PackFlows>,
96    components: HashMap<String, PackComponent>,
97    http_client: Arc<BlockingClient>,
98    pre_cache: Mutex<HashMap<String, InstancePre<ComponentState>>>,
99    session_store: Option<DynSessionStore>,
100    state_store: Option<DynStateStore>,
101    wasi_policy: Arc<RunnerWasiPolicy>,
102    provider_registry: RwLock<Option<ProviderRegistry>>,
103    secrets: DynSecretsManager,
104    oauth_config: Option<OAuthBrokerConfig>,
105    cache: CacheManager,
106}
107
108struct PackComponent {
109    #[allow(dead_code)]
110    name: String,
111    #[allow(dead_code)]
112    version: String,
113    component: Arc<Component>,
114}
115
116#[derive(Debug, Default, Clone)]
117pub struct ComponentResolution {
118    /// Root of a materialized pack directory containing `manifest.cbor` and `components/`.
119    pub materialized_root: Option<PathBuf>,
120    /// Explicit overrides mapping component id -> wasm path.
121    pub overrides: HashMap<String, PathBuf>,
122    /// If true, do not fetch remote components; require cached artifacts.
123    pub dist_offline: bool,
124    /// Optional cache directory for resolved remote components.
125    pub dist_cache_dir: Option<PathBuf>,
126    /// Allow bundled components without wasm_sha256 (dev-only escape hatch).
127    pub allow_missing_hash: bool,
128}
129
130fn build_blocking_client() -> BlockingClient {
131    std::thread::spawn(|| {
132        BlockingClient::builder()
133            .no_proxy()
134            .build()
135            .expect("blocking client")
136    })
137    .join()
138    .expect("client build thread panicked")
139}
140
141fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
142    let (root, candidate) = if path.is_absolute() {
143        let parent = path
144            .parent()
145            .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
146        let root = parent
147            .canonicalize()
148            .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
149        let file = path
150            .file_name()
151            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
152        (root, PathBuf::from(file))
153    } else {
154        let cwd = std::env::current_dir().context("failed to resolve current directory")?;
155        let base = if let Some(parent) = path.parent() {
156            cwd.join(parent)
157        } else {
158            cwd
159        };
160        let root = base
161            .canonicalize()
162            .with_context(|| format!("failed to canonicalize {}", base.display()))?;
163        let file = path
164            .file_name()
165            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
166        (root, PathBuf::from(file))
167    };
168    let safe = normalize_under_root(&root, &candidate)?;
169    Ok((root, safe))
170}
171
172static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
173
174#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct FlowDescriptor {
176    pub id: String,
177    #[serde(rename = "type")]
178    pub flow_type: String,
179    pub pack_id: String,
180    pub profile: String,
181    pub version: String,
182    #[serde(default)]
183    pub description: Option<String>,
184}
185
186pub struct HostState {
187    config: Arc<HostConfig>,
188    http_client: Arc<BlockingClient>,
189    default_env: String,
190    #[allow(dead_code)]
191    session_store: Option<DynSessionStore>,
192    state_store: Option<DynStateStore>,
193    mocks: Option<Arc<MockLayer>>,
194    secrets: DynSecretsManager,
195    oauth_config: Option<OAuthBrokerConfig>,
196    oauth_host: OAuthBrokerHost,
197    exec_ctx: Option<ComponentExecCtx>,
198}
199
200impl HostState {
201    #[allow(clippy::default_constructed_unit_structs)]
202    #[allow(clippy::too_many_arguments)]
203    pub fn new(
204        config: Arc<HostConfig>,
205        http_client: Arc<BlockingClient>,
206        mocks: Option<Arc<MockLayer>>,
207        session_store: Option<DynSessionStore>,
208        state_store: Option<DynStateStore>,
209        secrets: DynSecretsManager,
210        oauth_config: Option<OAuthBrokerConfig>,
211        exec_ctx: Option<ComponentExecCtx>,
212    ) -> Result<Self> {
213        let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
214        Ok(Self {
215            config,
216            http_client,
217            default_env,
218            session_store,
219            state_store,
220            mocks,
221            secrets,
222            oauth_config,
223            oauth_host: OAuthBrokerHost::default(),
224            exec_ctx,
225        })
226    }
227
228    pub fn get_secret(&self, key: &str) -> Result<String> {
229        if provider_core_only::is_enabled() {
230            bail!(provider_core_only::blocked_message("secrets"))
231        }
232        if !self.config.secrets_policy.is_allowed(key) {
233            bail!("secret {key} is not permitted by bindings policy");
234        }
235        if let Some(mock) = &self.mocks
236            && let Some(value) = mock.secrets_lookup(key)
237        {
238            return Ok(value);
239        }
240        let ctx = self.config.tenant_ctx();
241        let bytes = read_secret_blocking(&self.secrets, &ctx, key)
242            .context("failed to read secret from manager")?;
243        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
244        Ok(value)
245    }
246
247    fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
248        let tenant_raw = ctx
249            .as_ref()
250            .map(|ctx| ctx.tenant.clone())
251            .or_else(|| self.exec_ctx.as_ref().map(|ctx| ctx.tenant.tenant.clone()))
252            .unwrap_or_else(|| self.config.tenant.clone());
253        let env_raw = ctx
254            .as_ref()
255            .map(|ctx| ctx.env.clone())
256            .unwrap_or_else(|| self.default_env.clone());
257        let tenant_id = TenantId::from_str(&tenant_raw)
258            .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
259        let env_id = EnvId::from_str(&env_raw)
260            .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
261        let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
262        if let Some(exec_ctx) = self.exec_ctx.as_ref() {
263            if let Some(team) = exec_ctx.tenant.team.as_ref() {
264                let team_id =
265                    TeamId::from_str(team).with_context(|| format!("invalid team id `{team}`"))?;
266                tenant_ctx = tenant_ctx.with_team(Some(team_id));
267            }
268            if let Some(user) = exec_ctx.tenant.user.as_ref() {
269                let user_id =
270                    UserId::from_str(user).with_context(|| format!("invalid user id `{user}`"))?;
271                tenant_ctx = tenant_ctx.with_user(Some(user_id));
272            }
273            tenant_ctx = tenant_ctx.with_flow(exec_ctx.flow_id.clone());
274            if let Some(node) = exec_ctx.node_id.as_ref() {
275                tenant_ctx = tenant_ctx.with_node(node.clone());
276            }
277            if let Some(session) = exec_ctx.tenant.correlation_id.as_ref() {
278                tenant_ctx = tenant_ctx.with_session(session.clone());
279            }
280            tenant_ctx.trace_id = exec_ctx.tenant.trace_id.clone();
281        }
282
283        if let Some(ctx) = ctx {
284            if let Some(team) = ctx.team.or(ctx.team_id) {
285                let team_id =
286                    TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
287                tenant_ctx = tenant_ctx.with_team(Some(team_id));
288            }
289            if let Some(user) = ctx.user.or(ctx.user_id) {
290                let user_id =
291                    UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
292                tenant_ctx = tenant_ctx.with_user(Some(user_id));
293            }
294            if let Some(flow) = ctx.flow_id {
295                tenant_ctx = tenant_ctx.with_flow(flow);
296            }
297            if let Some(node) = ctx.node_id {
298                tenant_ctx = tenant_ctx.with_node(node);
299            }
300            if let Some(provider) = ctx.provider_id {
301                tenant_ctx = tenant_ctx.with_provider(provider);
302            }
303            if let Some(session) = ctx.session_id {
304                tenant_ctx = tenant_ctx.with_session(session);
305            }
306            tenant_ctx.trace_id = ctx.trace_id;
307        }
308        Ok(tenant_ctx)
309    }
310
311    fn send_http_request(
312        &mut self,
313        req: HttpRequest,
314        opts: Option<HttpRequestOptionsV1_1>,
315        _ctx: Option<HttpTenantCtx>,
316    ) -> Result<HttpResponse, HttpClientError> {
317        if !self.config.http_enabled {
318            return Err(HttpClientError {
319                code: "denied".into(),
320                message: "http client disabled by policy".into(),
321            });
322        }
323
324        let mut mock_state = None;
325        let raw_body = req.body.clone();
326        if let Some(mock) = &self.mocks
327            && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
328        {
329            match mock.http_begin(&meta) {
330                HttpDecision::Mock(response) => {
331                    let headers = response
332                        .headers
333                        .iter()
334                        .map(|(k, v)| (k.clone(), v.clone()))
335                        .collect();
336                    return Ok(HttpResponse {
337                        status: response.status,
338                        headers,
339                        body: response.body.clone().map(|b| b.into_bytes()),
340                    });
341                }
342                HttpDecision::Deny(reason) => {
343                    return Err(HttpClientError {
344                        code: "denied".into(),
345                        message: reason,
346                    });
347                }
348                HttpDecision::Passthrough { record } => {
349                    mock_state = Some((meta, record));
350                }
351            }
352        }
353
354        let method = req.method.parse().unwrap_or(reqwest::Method::GET);
355        let mut builder = self.http_client.request(method, &req.url);
356        for (key, value) in req.headers {
357            if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
358                && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
359            {
360                builder = builder.header(header, header_value);
361            }
362        }
363
364        if let Some(body) = raw_body.clone() {
365            builder = builder.body(body);
366        }
367
368        if let Some(opts) = opts {
369            if let Some(timeout_ms) = opts.timeout_ms {
370                builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
371            }
372            if opts.allow_insecure == Some(true) {
373                warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
374            }
375            if let Some(follow_redirects) = opts.follow_redirects
376                && !follow_redirects
377            {
378                warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
379            }
380        }
381
382        let response = match builder.send() {
383            Ok(resp) => resp,
384            Err(err) => {
385                warn!(url = %req.url, error = %err, "http client request failed");
386                return Err(HttpClientError {
387                    code: "unavailable".into(),
388                    message: err.to_string(),
389                });
390            }
391        };
392
393        let status = response.status().as_u16();
394        let headers_vec = response
395            .headers()
396            .iter()
397            .map(|(k, v)| {
398                (
399                    k.as_str().to_string(),
400                    v.to_str().unwrap_or_default().to_string(),
401                )
402            })
403            .collect::<Vec<_>>();
404        let body_bytes = response.bytes().ok().map(|b| b.to_vec());
405
406        if let Some((meta, true)) = mock_state.take()
407            && let Some(mock) = &self.mocks
408        {
409            let recorded = HttpMockResponse::new(
410                status,
411                headers_vec.clone().into_iter().collect(),
412                body_bytes
413                    .as_ref()
414                    .map(|b| String::from_utf8_lossy(b).into_owned()),
415            );
416            mock.http_record(&meta, &recorded);
417        }
418
419        Ok(HttpResponse {
420            status,
421            headers: headers_vec,
422            body: body_bytes,
423        })
424    }
425}
426
427impl SecretsStoreHost for HostState {
428    fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
429        if provider_core_only::is_enabled() {
430            warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
431            return Err(SecretsError::Denied);
432        }
433        if !self.config.secrets_policy.is_allowed(&key) {
434            return Err(SecretsError::Denied);
435        }
436        if let Some(mock) = &self.mocks
437            && let Some(value) = mock.secrets_lookup(&key)
438        {
439            return Ok(Some(value.into_bytes()));
440        }
441        let ctx = self.config.tenant_ctx();
442        match read_secret_blocking(&self.secrets, &ctx, &key) {
443            Ok(bytes) => Ok(Some(bytes)),
444            Err(err) => {
445                warn!(secret = %key, error = %err, "secret lookup failed");
446                Err(SecretsError::NotFound)
447            }
448        }
449    }
450}
451
452impl HttpClientHost for HostState {
453    fn send(
454        &mut self,
455        req: HttpRequest,
456        ctx: Option<HttpTenantCtx>,
457    ) -> Result<HttpResponse, HttpClientError> {
458        self.send_http_request(req, None, ctx)
459    }
460}
461
462impl HttpClientHostV1_1 for HostState {
463    fn send(
464        &mut self,
465        req: HttpRequestV1_1,
466        opts: Option<HttpRequestOptionsV1_1>,
467        ctx: Option<HttpTenantCtxV1_1>,
468    ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
469        let legacy_req = HttpRequest {
470            method: req.method,
471            url: req.url,
472            headers: req.headers,
473            body: req.body,
474        };
475        let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
476            env: ctx.env,
477            tenant: ctx.tenant,
478            tenant_id: ctx.tenant_id,
479            team: ctx.team,
480            team_id: ctx.team_id,
481            user: ctx.user,
482            user_id: ctx.user_id,
483            trace_id: ctx.trace_id,
484            correlation_id: ctx.correlation_id,
485            attributes: ctx.attributes,
486            session_id: ctx.session_id,
487            flow_id: ctx.flow_id,
488            node_id: ctx.node_id,
489            provider_id: ctx.provider_id,
490            deadline_ms: ctx.deadline_ms,
491            attempt: ctx.attempt,
492            idempotency_key: ctx.idempotency_key,
493            impersonation: ctx
494                .impersonation
495                .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
496                    actor_id,
497                    reason,
498                }),
499        });
500
501        self.send_http_request(legacy_req, opts, legacy_ctx)
502            .map(|resp| HttpResponseV1_1 {
503                status: resp.status,
504                headers: resp.headers,
505                body: resp.body,
506            })
507            .map_err(|err| HttpClientErrorV1_1 {
508                code: err.code,
509                message: err.message,
510            })
511    }
512}
513
514impl StateStoreHost for HostState {
515    fn read(
516        &mut self,
517        key: HostStateKey,
518        ctx: Option<StateTenantCtx>,
519    ) -> Result<Vec<u8>, StateError> {
520        let store = match self.state_store.as_ref() {
521            Some(store) => store.clone(),
522            None => {
523                return Err(StateError {
524                    code: "unavailable".into(),
525                    message: "state store not configured".into(),
526                });
527            }
528        };
529        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
530            Ok(ctx) => ctx,
531            Err(err) => {
532                return Err(StateError {
533                    code: "invalid-ctx".into(),
534                    message: err.to_string(),
535                });
536            }
537        };
538        let key = StoreStateKey::from(key);
539        match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
540            Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
541            Ok(None) => Err(StateError {
542                code: "not_found".into(),
543                message: "state key not found".into(),
544            }),
545            Err(err) => Err(StateError {
546                code: "internal".into(),
547                message: err.to_string(),
548            }),
549        }
550    }
551
552    fn write(
553        &mut self,
554        key: HostStateKey,
555        bytes: Vec<u8>,
556        ctx: Option<StateTenantCtx>,
557    ) -> Result<StateOpAck, StateError> {
558        let store = match self.state_store.as_ref() {
559            Some(store) => store.clone(),
560            None => {
561                return Err(StateError {
562                    code: "unavailable".into(),
563                    message: "state store not configured".into(),
564                });
565            }
566        };
567        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
568            Ok(ctx) => ctx,
569            Err(err) => {
570                return Err(StateError {
571                    code: "invalid-ctx".into(),
572                    message: err.to_string(),
573                });
574            }
575        };
576        let key = StoreStateKey::from(key);
577        let value = serde_json::from_slice(&bytes)
578            .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
579        match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
580            Ok(()) => Ok(StateOpAck::Ok),
581            Err(err) => Err(StateError {
582                code: "internal".into(),
583                message: err.to_string(),
584            }),
585        }
586    }
587
588    fn delete(
589        &mut self,
590        key: HostStateKey,
591        ctx: Option<StateTenantCtx>,
592    ) -> Result<StateOpAck, StateError> {
593        let store = match self.state_store.as_ref() {
594            Some(store) => store.clone(),
595            None => {
596                return Err(StateError {
597                    code: "unavailable".into(),
598                    message: "state store not configured".into(),
599                });
600            }
601        };
602        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
603            Ok(ctx) => ctx,
604            Err(err) => {
605                return Err(StateError {
606                    code: "invalid-ctx".into(),
607                    message: err.to_string(),
608                });
609            }
610        };
611        let key = StoreStateKey::from(key);
612        match store.del(&tenant_ctx, STATE_PREFIX, &key) {
613            Ok(_) => Ok(StateOpAck::Ok),
614            Err(err) => Err(StateError {
615                code: "internal".into(),
616                message: err.to_string(),
617            }),
618        }
619    }
620}
621
622impl TelemetryLoggerHost for HostState {
623    fn log(
624        &mut self,
625        span: TelemetrySpanContext,
626        fields: Vec<(String, String)>,
627        _ctx: Option<TelemetryTenantCtx>,
628    ) -> Result<TelemetryAck, TelemetryError> {
629        if let Some(mock) = &self.mocks
630            && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
631        {
632            return Ok(TelemetryAck::Ok);
633        }
634        let mut map = serde_json::Map::new();
635        for (k, v) in fields {
636            map.insert(k, Value::String(v));
637        }
638        tracing::info!(
639            tenant = %span.tenant,
640            flow_id = %span.flow_id,
641            node = ?span.node_id,
642            provider = %span.provider,
643            fields = %serde_json::Value::Object(map.clone()),
644            "telemetry log from pack"
645        );
646        Ok(TelemetryAck::Ok)
647    }
648}
649
650impl RunnerHostHttp for HostState {
651    fn request(
652        &mut self,
653        method: String,
654        url: String,
655        headers: Vec<String>,
656        body: Option<Vec<u8>>,
657    ) -> Result<Vec<u8>, String> {
658        let req = HttpRequest {
659            method,
660            url,
661            headers: headers
662                .chunks(2)
663                .filter_map(|chunk| {
664                    if chunk.len() == 2 {
665                        Some((chunk[0].clone(), chunk[1].clone()))
666                    } else {
667                        None
668                    }
669                })
670                .collect(),
671            body,
672        };
673        match HttpClientHost::send(self, req, None) {
674            Ok(resp) => Ok(resp.body.unwrap_or_default()),
675            Err(err) => Err(err.message),
676        }
677    }
678}
679
680impl RunnerHostKv for HostState {
681    fn get(&mut self, _ns: String, _key: String) -> Option<String> {
682        None
683    }
684
685    fn put(&mut self, _ns: String, _key: String, _val: String) {}
686}
687
688enum ManifestLoad {
689    New {
690        manifest: Box<greentic_types::PackManifest>,
691        flows: PackFlows,
692    },
693    Legacy {
694        manifest: Box<legacy_pack::PackManifest>,
695        flows: PackFlows,
696    },
697}
698
699fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
700    let mut archive = ZipArchive::new(File::open(path)?)
701        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
702    let bytes = read_entry(&mut archive, "manifest.cbor")
703        .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
704    match decode_pack_manifest(&bytes) {
705        Ok(manifest) => {
706            let cache = PackFlows::from_manifest(manifest.clone());
707            Ok(ManifestLoad::New {
708                manifest: Box::new(manifest),
709                flows: cache,
710            })
711        }
712        Err(err) => {
713            tracing::debug!(error = %err, pack = %path.display(), "decode_pack_manifest failed; trying legacy manifest");
714            // Fall back to legacy pack manifest
715            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
716                .context("failed to decode legacy pack manifest from manifest.cbor")?;
717            let flows = load_legacy_flows(&mut archive, &legacy)?;
718            Ok(ManifestLoad::Legacy {
719                manifest: Box::new(legacy),
720                flows,
721            })
722        }
723    }
724}
725
726fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
727    let manifest_path = root.join("manifest.cbor");
728    let bytes = std::fs::read(&manifest_path)
729        .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
730    match decode_pack_manifest(&bytes) {
731        Ok(manifest) => {
732            let cache = PackFlows::from_manifest(manifest.clone());
733            Ok(ManifestLoad::New {
734                manifest: Box::new(manifest),
735                flows: cache,
736            })
737        }
738        Err(err) => {
739            tracing::debug!(
740                error = %err,
741                pack = %root.display(),
742                "decode_pack_manifest failed for materialized pack; trying legacy manifest"
743            );
744            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
745                .context("failed to decode legacy pack manifest from manifest.cbor")?;
746            let flows = load_legacy_flows_from_dir(root, &legacy)?;
747            Ok(ManifestLoad::Legacy {
748                manifest: Box::new(legacy),
749                flows,
750            })
751        }
752    }
753}
754
755fn load_legacy_flows(
756    archive: &mut ZipArchive<File>,
757    manifest: &legacy_pack::PackManifest,
758) -> Result<PackFlows> {
759    let mut flows = HashMap::new();
760    let mut descriptors = Vec::new();
761
762    for entry in &manifest.flows {
763        let bytes = read_entry(archive, &entry.file_json)
764            .with_context(|| format!("missing flow json {}", entry.file_json))?;
765        let doc: FlowDoc = serde_json::from_slice(&bytes)
766            .with_context(|| format!("failed to decode flow doc {}", entry.file_json))?;
767        let normalized = normalize_flow_doc(doc);
768        let flow_ir = flow_doc_to_ir(normalized)?;
769        let flow = flow_ir_to_flow(flow_ir)?;
770
771        descriptors.push(FlowDescriptor {
772            id: entry.id.clone(),
773            flow_type: entry.kind.clone(),
774            pack_id: manifest.meta.pack_id.clone(),
775            profile: manifest.meta.pack_id.clone(),
776            version: manifest.meta.version.to_string(),
777            description: None,
778        });
779        flows.insert(entry.id.clone(), flow);
780    }
781
782    let mut entry_flows = manifest.meta.entry_flows.clone();
783    if entry_flows.is_empty() {
784        entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
785    }
786    let metadata = PackMetadata {
787        pack_id: manifest.meta.pack_id.clone(),
788        version: manifest.meta.version.to_string(),
789        entry_flows,
790        secret_requirements: Vec::new(),
791    };
792
793    Ok(PackFlows {
794        descriptors,
795        flows,
796        metadata,
797    })
798}
799
800fn load_legacy_flows_from_dir(
801    root: &Path,
802    manifest: &legacy_pack::PackManifest,
803) -> Result<PackFlows> {
804    let mut flows = HashMap::new();
805    let mut descriptors = Vec::new();
806
807    for entry in &manifest.flows {
808        let path = root.join(&entry.file_json);
809        let bytes = std::fs::read(&path)
810            .with_context(|| format!("missing flow json {}", path.display()))?;
811        let doc: FlowDoc = serde_json::from_slice(&bytes)
812            .with_context(|| format!("failed to decode flow doc {}", path.display()))?;
813        let normalized = normalize_flow_doc(doc);
814        let flow_ir = flow_doc_to_ir(normalized)?;
815        let flow = flow_ir_to_flow(flow_ir)?;
816
817        descriptors.push(FlowDescriptor {
818            id: entry.id.clone(),
819            flow_type: entry.kind.clone(),
820            pack_id: manifest.meta.pack_id.clone(),
821            profile: manifest.meta.pack_id.clone(),
822            version: manifest.meta.version.to_string(),
823            description: None,
824        });
825        flows.insert(entry.id.clone(), flow);
826    }
827
828    let mut entry_flows = manifest.meta.entry_flows.clone();
829    if entry_flows.is_empty() {
830        entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
831    }
832    let metadata = PackMetadata {
833        pack_id: manifest.meta.pack_id.clone(),
834        version: manifest.meta.version.to_string(),
835        entry_flows,
836        secret_requirements: Vec::new(),
837    };
838
839    Ok(PackFlows {
840        descriptors,
841        flows,
842        metadata,
843    })
844}
845
846pub struct ComponentState {
847    pub host: HostState,
848    wasi_ctx: WasiCtx,
849    resource_table: ResourceTable,
850}
851
852impl ComponentState {
853    pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
854        let wasi_ctx = policy
855            .instantiate()
856            .context("failed to build WASI context")?;
857        Ok(Self {
858            host,
859            wasi_ctx,
860            resource_table: ResourceTable::new(),
861        })
862    }
863
864    fn host_mut(&mut self) -> &mut HostState {
865        &mut self.host
866    }
867
868    fn should_cancel_host(&mut self) -> bool {
869        false
870    }
871
872    fn yield_now_host(&mut self) {
873        // no-op cooperative yield
874    }
875}
876
877impl component_api::v0_4::greentic::component::control::Host for ComponentState {
878    fn should_cancel(&mut self) -> bool {
879        self.should_cancel_host()
880    }
881
882    fn yield_now(&mut self) {
883        self.yield_now_host();
884    }
885}
886
887impl component_api::v0_5::greentic::component::control::Host for ComponentState {
888    fn should_cancel(&mut self) -> bool {
889        self.should_cancel_host()
890    }
891
892    fn yield_now(&mut self) {
893        self.yield_now_host();
894    }
895}
896
897fn add_component_control_instance(
898    linker: &mut Linker<ComponentState>,
899    name: &str,
900) -> wasmtime::Result<()> {
901    let mut inst = linker.instance(name)?;
902    inst.func_wrap(
903        "should-cancel",
904        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
905            let host = caller.data_mut();
906            Ok((host.should_cancel_host(),))
907        },
908    )?;
909    inst.func_wrap(
910        "yield-now",
911        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
912            let host = caller.data_mut();
913            host.yield_now_host();
914            Ok(())
915        },
916    )?;
917    Ok(())
918}
919
920fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
921    add_component_control_instance(linker, "greentic:component/control@0.5.0")?;
922    add_component_control_instance(linker, "greentic:component/control@0.4.0")?;
923    Ok(())
924}
925
926pub fn register_all(linker: &mut Linker<ComponentState>, allow_state_store: bool) -> Result<()> {
927    add_wasi_to_linker(linker)?;
928    add_all_v1_to_linker(
929        linker,
930        HostFns {
931            http_client_v1_1: Some(|state| state.host_mut()),
932            http_client: Some(|state| state.host_mut()),
933            oauth_broker: None,
934            runner_host_http: Some(|state| state.host_mut()),
935            runner_host_kv: Some(|state| state.host_mut()),
936            telemetry_logger: Some(|state| state.host_mut()),
937            state_store: allow_state_store.then_some(|state| state.host_mut()),
938            secrets_store: Some(|state| state.host_mut()),
939        },
940    )?;
941    Ok(())
942}
943
944impl OAuthHostContext for ComponentState {
945    fn tenant_id(&self) -> &str {
946        &self.host.config.tenant
947    }
948
949    fn env(&self) -> &str {
950        &self.host.default_env
951    }
952
953    fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
954        &mut self.host.oauth_host
955    }
956
957    fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
958        self.host.oauth_config.as_ref()
959    }
960}
961
962impl WasiView for ComponentState {
963    fn ctx(&mut self) -> WasiCtxView<'_> {
964        WasiCtxView {
965            ctx: &mut self.wasi_ctx,
966            table: &mut self.resource_table,
967        }
968    }
969}
970
971#[allow(unsafe_code)]
972unsafe impl Send for ComponentState {}
973#[allow(unsafe_code)]
974unsafe impl Sync for ComponentState {}
975
976impl PackRuntime {
977    fn allows_state_store(&self, component_ref: &str) -> bool {
978        if self.state_store.is_none() {
979            return false;
980        }
981        if !self.config.state_store_policy.allow {
982            return false;
983        }
984        let Some(manifest) = self.component_manifests.get(component_ref) else {
985            return false;
986        };
987        manifest
988            .capabilities
989            .host
990            .state
991            .as_ref()
992            .map(|caps| caps.read || caps.write)
993            .unwrap_or(false)
994    }
995
996    #[allow(clippy::too_many_arguments)]
997    pub async fn load(
998        path: impl AsRef<Path>,
999        config: Arc<HostConfig>,
1000        mocks: Option<Arc<MockLayer>>,
1001        archive_source: Option<&Path>,
1002        session_store: Option<DynSessionStore>,
1003        state_store: Option<DynStateStore>,
1004        wasi_policy: Arc<RunnerWasiPolicy>,
1005        secrets: DynSecretsManager,
1006        oauth_config: Option<OAuthBrokerConfig>,
1007        verify_archive: bool,
1008        component_resolution: ComponentResolution,
1009    ) -> Result<Self> {
1010        let path = path.as_ref();
1011        let (_pack_root, safe_path) = normalize_pack_path(path)?;
1012        let path_meta = std::fs::metadata(&safe_path).ok();
1013        let is_dir = path_meta
1014            .as_ref()
1015            .map(|meta| meta.is_dir())
1016            .unwrap_or(false);
1017        let is_component = !is_dir
1018            && safe_path
1019                .extension()
1020                .and_then(|ext| ext.to_str())
1021                .map(|ext| ext.eq_ignore_ascii_case("wasm"))
1022                .unwrap_or(false);
1023        let archive_hint_path = if let Some(source) = archive_source {
1024            let (_, normalized) = normalize_pack_path(source)?;
1025            Some(normalized)
1026        } else if is_component || is_dir {
1027            None
1028        } else {
1029            Some(safe_path.clone())
1030        };
1031        let archive_hint = archive_hint_path.as_deref();
1032        if verify_archive {
1033            if let Some(verify_target) = archive_hint.and_then(|p| {
1034                std::fs::metadata(p)
1035                    .ok()
1036                    .filter(|meta| meta.is_file())
1037                    .map(|_| p)
1038            }) {
1039                verify::verify_pack(verify_target).await?;
1040                tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
1041            } else {
1042                tracing::debug!("skipping archive verification (no archive source)");
1043            }
1044        }
1045        let engine = Engine::default();
1046        let engine_profile =
1047            EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1048        let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1049        let mut metadata = PackMetadata::fallback(&safe_path);
1050        let mut manifest = None;
1051        let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
1052        let mut flows = None;
1053        let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
1054            if is_dir {
1055                Some(safe_path.clone())
1056            } else {
1057                None
1058            }
1059        });
1060
1061        if let Some(root) = materialized_root.as_ref() {
1062            match load_manifest_and_flows_from_dir(root) {
1063                Ok(ManifestLoad::New {
1064                    manifest: m,
1065                    flows: cache,
1066                }) => {
1067                    metadata = cache.metadata.clone();
1068                    manifest = Some(*m);
1069                    flows = Some(cache);
1070                }
1071                Ok(ManifestLoad::Legacy {
1072                    manifest: m,
1073                    flows: cache,
1074                }) => {
1075                    metadata = cache.metadata.clone();
1076                    legacy_manifest = Some(m);
1077                    flows = Some(cache);
1078                }
1079                Err(err) => {
1080                    warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
1081                }
1082            }
1083        }
1084
1085        if manifest.is_none()
1086            && legacy_manifest.is_none()
1087            && let Some(archive_path) = archive_hint
1088        {
1089            match load_manifest_and_flows(archive_path) {
1090                Ok(ManifestLoad::New {
1091                    manifest: m,
1092                    flows: cache,
1093                }) => {
1094                    metadata = cache.metadata.clone();
1095                    manifest = Some(*m);
1096                    flows = Some(cache);
1097                }
1098                Ok(ManifestLoad::Legacy {
1099                    manifest: m,
1100                    flows: cache,
1101                }) => {
1102                    metadata = cache.metadata.clone();
1103                    legacy_manifest = Some(m);
1104                    flows = Some(cache);
1105                }
1106                Err(err) => {
1107                    warn!(error = %err, pack = %archive_path.display(), "failed to parse pack manifest; skipping flows");
1108                }
1109            }
1110        }
1111        let mut pack_lock = None;
1112        for root in find_pack_lock_roots(&safe_path, is_dir, archive_hint) {
1113            pack_lock = load_pack_lock(&root)?;
1114            if pack_lock.is_some() {
1115                break;
1116            }
1117        }
1118        let component_sources_payload = if pack_lock.is_none() {
1119            if let Some(manifest) = manifest.as_ref() {
1120                manifest
1121                    .get_component_sources_v1()
1122                    .context("invalid component sources extension")?
1123            } else {
1124                None
1125            }
1126        } else {
1127            None
1128        };
1129        let component_sources = if let Some(lock) = pack_lock.as_ref() {
1130            Some(component_sources_table_from_pack_lock(
1131                lock,
1132                component_resolution.allow_missing_hash,
1133            )?)
1134        } else {
1135            component_sources_table(component_sources_payload.as_ref())?
1136        };
1137        let components = if is_component {
1138            let wasm_bytes = fs::read(&safe_path).await?;
1139            metadata = PackMetadata::from_wasm(&wasm_bytes)
1140                .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1141            let name = safe_path
1142                .file_stem()
1143                .map(|s| s.to_string_lossy().to_string())
1144                .unwrap_or_else(|| "component".to_string());
1145            let component = compile_component_with_cache(&cache, &engine, None, wasm_bytes).await?;
1146            let mut map = HashMap::new();
1147            map.insert(
1148                name.clone(),
1149                PackComponent {
1150                    name,
1151                    version: metadata.version.clone(),
1152                    component,
1153                },
1154            );
1155            map
1156        } else {
1157            let specs = component_specs(
1158                manifest.as_ref(),
1159                legacy_manifest.as_deref(),
1160                component_sources_payload.as_ref(),
1161                pack_lock.as_ref(),
1162            );
1163            if specs.is_empty() {
1164                HashMap::new()
1165            } else {
1166                let mut loaded = HashMap::new();
1167                let mut missing: HashSet<String> =
1168                    specs.iter().map(|spec| spec.id.clone()).collect();
1169                let mut searched = Vec::new();
1170
1171                if !component_resolution.overrides.is_empty() {
1172                    load_components_from_overrides(
1173                        &cache,
1174                        &engine,
1175                        &component_resolution.overrides,
1176                        &specs,
1177                        &mut missing,
1178                        &mut loaded,
1179                    )
1180                    .await?;
1181                    searched.push("override map".to_string());
1182                }
1183
1184                if let Some(component_sources) = component_sources.as_ref() {
1185                    load_components_from_sources(
1186                        &cache,
1187                        &engine,
1188                        component_sources,
1189                        &component_resolution,
1190                        &specs,
1191                        &mut missing,
1192                        &mut loaded,
1193                        materialized_root.as_deref(),
1194                        archive_hint,
1195                    )
1196                    .await?;
1197                    searched.push(format!("extension {}", EXT_COMPONENT_SOURCES_V1));
1198                }
1199
1200                if let Some(root) = materialized_root.as_ref() {
1201                    load_components_from_dir(
1202                        &cache,
1203                        &engine,
1204                        root,
1205                        &specs,
1206                        &mut missing,
1207                        &mut loaded,
1208                    )
1209                    .await?;
1210                    searched.push(format!("components dir {}", root.display()));
1211                }
1212
1213                if let Some(archive_path) = archive_hint {
1214                    load_components_from_archive(
1215                        &cache,
1216                        &engine,
1217                        archive_path,
1218                        &specs,
1219                        &mut missing,
1220                        &mut loaded,
1221                    )
1222                    .await?;
1223                    searched.push(format!("archive {}", archive_path.display()));
1224                }
1225
1226                if !missing.is_empty() {
1227                    let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
1228                    let sources = if searched.is_empty() {
1229                        "no component sources".to_string()
1230                    } else {
1231                        searched.join(", ")
1232                    };
1233                    bail!(
1234                        "components missing: {}; looked in {}",
1235                        missing_list,
1236                        sources
1237                    );
1238                }
1239
1240                loaded
1241            }
1242        };
1243        let http_client = Arc::clone(&HTTP_CLIENT);
1244        let mut component_manifests = HashMap::new();
1245        if let Some(manifest) = manifest.as_ref() {
1246            for component in &manifest.components {
1247                component_manifests.insert(component.id.as_str().to_string(), component.clone());
1248            }
1249        }
1250        Ok(Self {
1251            path: safe_path,
1252            archive_path: archive_hint.map(Path::to_path_buf),
1253            config,
1254            engine,
1255            metadata,
1256            manifest,
1257            legacy_manifest,
1258            component_manifests,
1259            mocks,
1260            flows,
1261            components,
1262            http_client,
1263            pre_cache: Mutex::new(HashMap::new()),
1264            session_store,
1265            state_store,
1266            wasi_policy,
1267            provider_registry: RwLock::new(None),
1268            secrets,
1269            oauth_config,
1270            cache,
1271        })
1272    }
1273
1274    pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
1275        if let Some(cache) = &self.flows {
1276            return Ok(cache.descriptors.clone());
1277        }
1278        if let Some(manifest) = &self.manifest {
1279            let descriptors = manifest
1280                .flows
1281                .iter()
1282                .map(|flow| FlowDescriptor {
1283                    id: flow.id.as_str().to_string(),
1284                    flow_type: flow_kind_to_str(flow.kind).to_string(),
1285                    pack_id: manifest.pack_id.as_str().to_string(),
1286                    profile: manifest.pack_id.as_str().to_string(),
1287                    version: manifest.version.to_string(),
1288                    description: None,
1289                })
1290                .collect();
1291            return Ok(descriptors);
1292        }
1293        Ok(Vec::new())
1294    }
1295
1296    #[allow(dead_code)]
1297    pub async fn run_flow(
1298        &self,
1299        flow_id: &str,
1300        input: serde_json::Value,
1301    ) -> Result<serde_json::Value> {
1302        let pack = Arc::new(
1303            PackRuntime::load(
1304                &self.path,
1305                Arc::clone(&self.config),
1306                self.mocks.clone(),
1307                self.archive_path.as_deref(),
1308                self.session_store.clone(),
1309                self.state_store.clone(),
1310                Arc::clone(&self.wasi_policy),
1311                self.secrets.clone(),
1312                self.oauth_config.clone(),
1313                false,
1314                ComponentResolution::default(),
1315            )
1316            .await?,
1317        );
1318
1319        let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1320        let retry_config = self.config.retry_config().into();
1321        let mocks = pack.mocks.as_deref();
1322        let tenant = self.config.tenant.as_str();
1323
1324        let ctx = FlowContext {
1325            tenant,
1326            pack_id: pack.metadata().pack_id.as_str(),
1327            flow_id,
1328            node_id: None,
1329            tool: None,
1330            action: None,
1331            session_id: None,
1332            provider_id: None,
1333            retry_config,
1334            observer: None,
1335            mocks,
1336        };
1337
1338        let execution = engine.execute(ctx, input).await?;
1339        match execution.status {
1340            FlowStatus::Completed => Ok(execution.output),
1341            FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1342                "status": "pending",
1343                "reason": wait.reason,
1344                "resume": wait.snapshot,
1345                "response": execution.output,
1346            })),
1347        }
1348    }
1349
1350    pub async fn invoke_component(
1351        &self,
1352        component_ref: &str,
1353        ctx: ComponentExecCtx,
1354        operation: &str,
1355        _config_json: Option<String>,
1356        input_json: String,
1357    ) -> Result<Value> {
1358        let pack_component = self
1359            .components
1360            .get(component_ref)
1361            .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1362
1363        let mut linker = Linker::new(&self.engine);
1364        let allow_state_store = self.allows_state_store(component_ref);
1365        register_all(&mut linker, allow_state_store)?;
1366        add_component_control_to_linker(&mut linker)?;
1367
1368        let host_state = HostState::new(
1369            Arc::clone(&self.config),
1370            Arc::clone(&self.http_client),
1371            self.mocks.clone(),
1372            self.session_store.clone(),
1373            self.state_store.clone(),
1374            Arc::clone(&self.secrets),
1375            self.oauth_config.clone(),
1376            Some(ctx.clone()),
1377        )?;
1378        let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1379        let mut store = wasmtime::Store::new(&self.engine, store_state);
1380        let pre_instance = linker.instantiate_pre(pack_component.component.as_ref())?;
1381        let result = match component_api::v0_5::ComponentPre::new(pre_instance) {
1382            Ok(pre) => {
1383                let bindings = pre.instantiate_async(&mut store).await?;
1384                let node = bindings.greentic_component_node();
1385                let ctx_v05 = component_api::exec_ctx_v0_5(&ctx);
1386                let result = node.call_invoke(&mut store, &ctx_v05, operation, &input_json)?;
1387                component_api::invoke_result_from_v0_5(result)
1388            }
1389            Err(err) => {
1390                if is_missing_node_export(&err, "0.5.0") {
1391                    let pre_instance = linker.instantiate_pre(pack_component.component.as_ref())?;
1392                    let pre: component_api::v0_4::ComponentPre<ComponentState> =
1393                        component_api::v0_4::ComponentPre::new(pre_instance)?;
1394                    let bindings = pre.instantiate_async(&mut store).await?;
1395                    let node = bindings.greentic_component_node();
1396                    let ctx_v04 = component_api::exec_ctx_v0_4(&ctx);
1397                    let result = node.call_invoke(&mut store, &ctx_v04, operation, &input_json)?;
1398                    component_api::invoke_result_from_v0_4(result)
1399                } else {
1400                    return Err(err);
1401                }
1402            }
1403        };
1404
1405        match result {
1406            InvokeResult::Ok(body) => {
1407                if body.is_empty() {
1408                    return Ok(Value::Null);
1409                }
1410                serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
1411            }
1412            InvokeResult::Err(NodeError {
1413                code,
1414                message,
1415                retryable,
1416                backoff_ms,
1417                details,
1418            }) => {
1419                let mut obj = serde_json::Map::new();
1420                obj.insert("ok".into(), Value::Bool(false));
1421                let mut error = serde_json::Map::new();
1422                error.insert("code".into(), Value::String(code));
1423                error.insert("message".into(), Value::String(message));
1424                error.insert("retryable".into(), Value::Bool(retryable));
1425                if let Some(backoff) = backoff_ms {
1426                    error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1427                }
1428                if let Some(details) = details {
1429                    error.insert(
1430                        "details".into(),
1431                        serde_json::from_str(&details).unwrap_or(Value::String(details)),
1432                    );
1433                }
1434                obj.insert("error".into(), Value::Object(error));
1435                Ok(Value::Object(obj))
1436            }
1437        }
1438    }
1439
1440    pub fn resolve_provider(
1441        &self,
1442        provider_id: Option<&str>,
1443        provider_type: Option<&str>,
1444    ) -> Result<ProviderBinding> {
1445        let registry = self.provider_registry()?;
1446        registry.resolve(provider_id, provider_type)
1447    }
1448
1449    pub async fn invoke_provider(
1450        &self,
1451        binding: &ProviderBinding,
1452        ctx: ComponentExecCtx,
1453        op: &str,
1454        input_json: Vec<u8>,
1455    ) -> Result<Value> {
1456        let component_ref = &binding.component_ref;
1457        let pack_component = self
1458            .components
1459            .get(component_ref)
1460            .with_context(|| format!("provider component '{component_ref}' not found in pack"))?;
1461
1462        let mut linker = Linker::new(&self.engine);
1463        let allow_state_store = self.allows_state_store(component_ref);
1464        register_all(&mut linker, allow_state_store)?;
1465        add_component_control_to_linker(&mut linker)?;
1466        let pre_instance = linker.instantiate_pre(pack_component.component.as_ref())?;
1467        let pre: ProviderComponentPre<ComponentState> = ProviderComponentPre::new(pre_instance)?;
1468
1469        let host_state = HostState::new(
1470            Arc::clone(&self.config),
1471            Arc::clone(&self.http_client),
1472            self.mocks.clone(),
1473            self.session_store.clone(),
1474            self.state_store.clone(),
1475            Arc::clone(&self.secrets),
1476            self.oauth_config.clone(),
1477            Some(ctx),
1478        )?;
1479        let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1480        let mut store = wasmtime::Store::new(&self.engine, store_state);
1481        let bindings: crate::provider_core::SchemaCore = pre.instantiate_async(&mut store).await?;
1482        let provider = bindings.greentic_provider_core_schema_core_api();
1483
1484        let result = provider.call_invoke(&mut store, op, &input_json)?;
1485        deserialize_json_bytes(result)
1486    }
1487
1488    fn provider_registry(&self) -> Result<ProviderRegistry> {
1489        if let Some(registry) = self.provider_registry.read().clone() {
1490            return Ok(registry);
1491        }
1492        let manifest = self
1493            .manifest
1494            .as_ref()
1495            .context("pack manifest required for provider resolution")?;
1496        let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1497        let registry = ProviderRegistry::new(
1498            manifest,
1499            self.state_store.clone(),
1500            &self.config.tenant,
1501            &env,
1502        )?;
1503        *self.provider_registry.write() = Some(registry.clone());
1504        Ok(registry)
1505    }
1506
1507    pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1508        if let Some(cache) = &self.flows {
1509            return cache
1510                .flows
1511                .get(flow_id)
1512                .cloned()
1513                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1514        }
1515        if let Some(manifest) = &self.manifest {
1516            let entry = manifest
1517                .flows
1518                .iter()
1519                .find(|f| f.id.as_str() == flow_id)
1520                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1521            return Ok(entry.flow.clone());
1522        }
1523        bail!("flow '{flow_id}' not available (pack exports disabled)")
1524    }
1525
1526    pub fn metadata(&self) -> &PackMetadata {
1527        &self.metadata
1528    }
1529
1530    pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1531        &self.metadata.secret_requirements
1532    }
1533
1534    pub fn missing_secrets(
1535        &self,
1536        tenant_ctx: &TypesTenantCtx,
1537    ) -> Vec<greentic_types::SecretRequirement> {
1538        let env = tenant_ctx.env.as_str().to_string();
1539        let tenant = tenant_ctx.tenant.as_str().to_string();
1540        let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1541        self.required_secrets()
1542            .iter()
1543            .filter(|req| {
1544                // scope must match current context if provided
1545                if let Some(scope) = &req.scope {
1546                    if scope.env != env {
1547                        return false;
1548                    }
1549                    if scope.tenant != tenant {
1550                        return false;
1551                    }
1552                    if let Some(ref team_req) = scope.team
1553                        && team.as_ref() != Some(team_req)
1554                    {
1555                        return false;
1556                    }
1557                }
1558                let ctx = self.config.tenant_ctx();
1559                read_secret_blocking(&self.secrets, &ctx, req.key.as_str()).is_err()
1560            })
1561            .cloned()
1562            .collect()
1563    }
1564
1565    pub fn for_component_test(
1566        components: Vec<(String, PathBuf)>,
1567        flows: HashMap<String, FlowIR>,
1568        pack_id: &str,
1569        config: Arc<HostConfig>,
1570    ) -> Result<Self> {
1571        let engine = Engine::default();
1572        let engine_profile =
1573            EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1574        let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1575        let mut component_map = HashMap::new();
1576        for (name, path) in components {
1577            if !path.exists() {
1578                bail!("component artifact missing: {}", path.display());
1579            }
1580            let wasm_bytes = std::fs::read(&path)?;
1581            let component = Arc::new(
1582                Component::from_binary(&engine, &wasm_bytes)
1583                    .with_context(|| format!("failed to compile component {}", path.display()))?,
1584            );
1585            component_map.insert(
1586                name.clone(),
1587                PackComponent {
1588                    name,
1589                    version: "0.0.0".into(),
1590                    component,
1591                },
1592            );
1593        }
1594
1595        let mut flow_map = HashMap::new();
1596        let mut descriptors = Vec::new();
1597        for (id, ir) in flows {
1598            let flow_type = ir.flow_type.clone();
1599            let flow = flow_ir_to_flow(ir)?;
1600            flow_map.insert(id.clone(), flow);
1601            descriptors.push(FlowDescriptor {
1602                id: id.clone(),
1603                flow_type,
1604                pack_id: pack_id.to_string(),
1605                profile: "test".into(),
1606                version: "0.0.0".into(),
1607                description: None,
1608            });
1609        }
1610        let entry_flows = descriptors.iter().map(|flow| flow.id.clone()).collect();
1611        let metadata = PackMetadata {
1612            pack_id: pack_id.to_string(),
1613            version: "0.0.0".into(),
1614            entry_flows,
1615            secret_requirements: Vec::new(),
1616        };
1617        let flows_cache = PackFlows {
1618            descriptors: descriptors.clone(),
1619            flows: flow_map,
1620            metadata: metadata.clone(),
1621        };
1622
1623        Ok(Self {
1624            path: PathBuf::new(),
1625            archive_path: None,
1626            config,
1627            engine,
1628            metadata,
1629            manifest: None,
1630            legacy_manifest: None,
1631            component_manifests: HashMap::new(),
1632            mocks: None,
1633            flows: Some(flows_cache),
1634            components: component_map,
1635            http_client: Arc::clone(&HTTP_CLIENT),
1636            pre_cache: Mutex::new(HashMap::new()),
1637            session_store: None,
1638            state_store: None,
1639            wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1640            provider_registry: RwLock::new(None),
1641            secrets: crate::secrets::default_manager()?,
1642            oauth_config: None,
1643            cache,
1644        })
1645    }
1646}
1647
1648fn is_missing_node_export(err: &wasmtime::Error, version: &str) -> bool {
1649    let message = err.to_string();
1650    message.contains("no exported instance named")
1651        && message.contains(&format!("greentic:component/node@{version}"))
1652}
1653
1654struct PackFlows {
1655    descriptors: Vec<FlowDescriptor>,
1656    flows: HashMap<String, Flow>,
1657    metadata: PackMetadata,
1658}
1659
1660const RUNTIME_FLOW_EXTENSION_IDS: [&str; 3] = [
1661    "greentic.pack.runtime_flow",
1662    "greentic.pack.flow_runtime",
1663    "greentic.pack.runtime_flows",
1664];
1665
1666#[derive(Debug, Deserialize)]
1667struct RuntimeFlowBundle {
1668    flows: Vec<RuntimeFlow>,
1669}
1670
1671#[derive(Debug, Deserialize)]
1672struct RuntimeFlow {
1673    id: String,
1674    #[serde(alias = "flow_type")]
1675    kind: FlowKind,
1676    #[serde(default)]
1677    schema_version: Option<String>,
1678    #[serde(default)]
1679    start: Option<String>,
1680    #[serde(default)]
1681    entrypoints: BTreeMap<String, Value>,
1682    nodes: BTreeMap<String, RuntimeNode>,
1683    #[serde(default)]
1684    metadata: Option<FlowMetadata>,
1685}
1686
1687#[derive(Debug, Deserialize)]
1688struct RuntimeNode {
1689    #[serde(alias = "component")]
1690    component_id: String,
1691    #[serde(default, alias = "operation")]
1692    operation_name: Option<String>,
1693    #[serde(default, alias = "payload", alias = "input")]
1694    operation_payload: Value,
1695    #[serde(default)]
1696    routing: Option<Routing>,
1697    #[serde(default)]
1698    telemetry: Option<TelemetryHints>,
1699}
1700
1701fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
1702    if bytes.is_empty() {
1703        return Ok(Value::Null);
1704    }
1705    serde_json::from_slice(&bytes).or_else(|_| {
1706        String::from_utf8(bytes)
1707            .map(Value::String)
1708            .map_err(|err| anyhow!(err))
1709    })
1710}
1711
1712impl PackFlows {
1713    fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
1714        if let Some(flows) = flows_from_runtime_extension(&manifest) {
1715            return flows;
1716        }
1717        let descriptors = manifest
1718            .flows
1719            .iter()
1720            .map(|entry| FlowDescriptor {
1721                id: entry.id.as_str().to_string(),
1722                flow_type: flow_kind_to_str(entry.kind).to_string(),
1723                pack_id: manifest.pack_id.as_str().to_string(),
1724                profile: manifest.pack_id.as_str().to_string(),
1725                version: manifest.version.to_string(),
1726                description: None,
1727            })
1728            .collect();
1729        let mut flows = HashMap::new();
1730        for entry in &manifest.flows {
1731            flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
1732        }
1733        Self {
1734            metadata: PackMetadata::from_manifest(&manifest),
1735            descriptors,
1736            flows,
1737        }
1738    }
1739}
1740
1741fn flows_from_runtime_extension(manifest: &greentic_types::PackManifest) -> Option<PackFlows> {
1742    let extensions = manifest.extensions.as_ref()?;
1743    let extension = extensions.iter().find_map(|(key, ext)| {
1744        if RUNTIME_FLOW_EXTENSION_IDS
1745            .iter()
1746            .any(|candidate| candidate == key)
1747        {
1748            Some(ext)
1749        } else {
1750            None
1751        }
1752    })?;
1753    let runtime_flows = match decode_runtime_flow_extension(extension) {
1754        Some(flows) if !flows.is_empty() => flows,
1755        _ => return None,
1756    };
1757
1758    let descriptors = runtime_flows
1759        .iter()
1760        .map(|flow| FlowDescriptor {
1761            id: flow.id.as_str().to_string(),
1762            flow_type: flow_kind_to_str(flow.kind).to_string(),
1763            pack_id: manifest.pack_id.as_str().to_string(),
1764            profile: manifest.pack_id.as_str().to_string(),
1765            version: manifest.version.to_string(),
1766            description: None,
1767        })
1768        .collect::<Vec<_>>();
1769    let flows = runtime_flows
1770        .into_iter()
1771        .map(|flow| (flow.id.as_str().to_string(), flow))
1772        .collect();
1773
1774    Some(PackFlows {
1775        metadata: PackMetadata::from_manifest(manifest),
1776        descriptors,
1777        flows,
1778    })
1779}
1780
1781fn decode_runtime_flow_extension(extension: &ExtensionRef) -> Option<Vec<Flow>> {
1782    let value = match extension.inline.as_ref()? {
1783        ExtensionInline::Other(value) => value.clone(),
1784        _ => return None,
1785    };
1786
1787    if let Ok(bundle) = serde_json::from_value::<RuntimeFlowBundle>(value.clone()) {
1788        return Some(collect_runtime_flows(bundle.flows));
1789    }
1790
1791    if let Ok(flows) = serde_json::from_value::<Vec<RuntimeFlow>>(value.clone()) {
1792        return Some(collect_runtime_flows(flows));
1793    }
1794
1795    if let Ok(flows) = serde_json::from_value::<Vec<Flow>>(value) {
1796        return Some(flows);
1797    }
1798
1799    warn!(
1800        extension = %extension.kind,
1801        version = %extension.version,
1802        "runtime flow extension present but could not be decoded"
1803    );
1804    None
1805}
1806
1807fn collect_runtime_flows(flows: Vec<RuntimeFlow>) -> Vec<Flow> {
1808    flows
1809        .into_iter()
1810        .filter_map(|flow| match runtime_flow_to_flow(flow) {
1811            Ok(flow) => Some(flow),
1812            Err(err) => {
1813                warn!(error = %err, "failed to decode runtime flow");
1814                None
1815            }
1816        })
1817        .collect()
1818}
1819
1820fn runtime_flow_to_flow(runtime: RuntimeFlow) -> Result<Flow> {
1821    let flow_id = FlowId::from_str(&runtime.id)
1822        .with_context(|| format!("invalid flow id `{}`", runtime.id))?;
1823    let mut entrypoints = runtime.entrypoints;
1824    if entrypoints.is_empty()
1825        && let Some(start) = &runtime.start
1826    {
1827        entrypoints.insert("default".into(), Value::String(start.clone()));
1828    }
1829
1830    let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
1831    for (id, node) in runtime.nodes {
1832        let node_id = NodeId::from_str(&id).with_context(|| format!("invalid node id `{id}`"))?;
1833        let component_id = ComponentId::from_str(&node.component_id)
1834            .with_context(|| format!("invalid component id `{}`", node.component_id))?;
1835        let component = FlowComponentRef {
1836            id: component_id,
1837            pack_alias: None,
1838            operation: node.operation_name,
1839        };
1840        let routing = node.routing.unwrap_or(Routing::End);
1841        let telemetry = node.telemetry.unwrap_or_default();
1842        nodes.insert(
1843            node_id.clone(),
1844            Node {
1845                id: node_id,
1846                component,
1847                input: InputMapping {
1848                    mapping: node.operation_payload,
1849                },
1850                output: OutputMapping {
1851                    mapping: Value::Null,
1852                },
1853                routing,
1854                telemetry,
1855            },
1856        );
1857    }
1858
1859    Ok(Flow {
1860        schema_version: runtime.schema_version.unwrap_or_else(|| "1.0".to_string()),
1861        id: flow_id,
1862        kind: runtime.kind,
1863        entrypoints,
1864        nodes,
1865        metadata: runtime.metadata.unwrap_or_default(),
1866    })
1867}
1868
1869fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
1870    match kind {
1871        greentic_types::FlowKind::Messaging => "messaging",
1872        greentic_types::FlowKind::Event => "event",
1873        greentic_types::FlowKind::ComponentConfig => "component-config",
1874        greentic_types::FlowKind::Job => "job",
1875        greentic_types::FlowKind::Http => "http",
1876    }
1877}
1878
1879fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1880    let mut file = archive
1881        .by_name(name)
1882        .with_context(|| format!("entry {name} missing from archive"))?;
1883    let mut buf = Vec::new();
1884    file.read_to_end(&mut buf)?;
1885    Ok(buf)
1886}
1887
1888fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
1889    for node in doc.nodes.values_mut() {
1890        let Some((component_ref, payload)) = node
1891            .raw
1892            .iter()
1893            .next()
1894            .map(|(key, value)| (key.clone(), value.clone()))
1895        else {
1896            continue;
1897        };
1898        if component_ref.starts_with("emit.") {
1899            node.operation = Some(component_ref);
1900            node.payload = payload;
1901            node.raw.clear();
1902            continue;
1903        }
1904        let (target_component, operation, input, config) =
1905            infer_component_exec(&payload, &component_ref);
1906        let mut payload_obj = serde_json::Map::new();
1907        // component.exec is meta; ensure the payload carries the actual target component.
1908        payload_obj.insert("component".into(), Value::String(target_component));
1909        payload_obj.insert("operation".into(), Value::String(operation));
1910        payload_obj.insert("input".into(), input);
1911        if let Some(cfg) = config {
1912            payload_obj.insert("config".into(), cfg);
1913        }
1914        node.operation = Some("component.exec".to_string());
1915        node.payload = Value::Object(payload_obj);
1916        node.raw.clear();
1917    }
1918    doc
1919}
1920
1921fn infer_component_exec(
1922    payload: &Value,
1923    component_ref: &str,
1924) -> (String, String, Value, Option<Value>) {
1925    let default_op = if component_ref.starts_with("templating.") {
1926        "render"
1927    } else {
1928        "invoke"
1929    }
1930    .to_string();
1931
1932    if let Value::Object(map) = payload {
1933        let op = map
1934            .get("op")
1935            .or_else(|| map.get("operation"))
1936            .and_then(Value::as_str)
1937            .map(|s| s.to_string())
1938            .unwrap_or_else(|| default_op.clone());
1939
1940        let mut input = map.clone();
1941        let config = input.remove("config");
1942        let component = input
1943            .get("component")
1944            .or_else(|| input.get("component_ref"))
1945            .and_then(Value::as_str)
1946            .map(|s| s.to_string())
1947            .unwrap_or_else(|| component_ref.to_string());
1948        input.remove("component");
1949        input.remove("component_ref");
1950        input.remove("op");
1951        input.remove("operation");
1952        return (component, op, Value::Object(input), config);
1953    }
1954
1955    (component_ref.to_string(), default_op, payload.clone(), None)
1956}
1957
1958#[derive(Clone, Debug)]
1959struct ComponentSpec {
1960    id: String,
1961    version: String,
1962    legacy_path: Option<String>,
1963}
1964
1965#[derive(Clone, Debug)]
1966struct ComponentSourceInfo {
1967    digest: Option<String>,
1968    source: ComponentSourceRef,
1969    artifact: ComponentArtifactLocation,
1970    expected_wasm_sha256: Option<String>,
1971    skip_digest_verification: bool,
1972}
1973
1974#[derive(Clone, Debug)]
1975enum ComponentArtifactLocation {
1976    Inline { wasm_path: String },
1977    Remote,
1978}
1979
1980#[derive(Clone, Debug, Deserialize)]
1981struct PackLockV1 {
1982    schema_version: u32,
1983    components: Vec<PackLockComponent>,
1984}
1985
1986#[derive(Clone, Debug, Deserialize)]
1987struct PackLockComponent {
1988    name: String,
1989    #[serde(default, rename = "source_ref")]
1990    source_ref: Option<String>,
1991    #[serde(default, rename = "ref")]
1992    legacy_ref: Option<String>,
1993    #[serde(default)]
1994    component_id: Option<ComponentId>,
1995    #[serde(default)]
1996    bundled: Option<bool>,
1997    #[serde(default, rename = "bundled_path")]
1998    bundled_path: Option<String>,
1999    #[serde(default, rename = "path")]
2000    legacy_path: Option<String>,
2001    #[serde(default)]
2002    wasm_sha256: Option<String>,
2003    #[serde(default, rename = "sha256")]
2004    legacy_sha256: Option<String>,
2005    #[serde(default)]
2006    resolved_digest: Option<String>,
2007    #[serde(default)]
2008    digest: Option<String>,
2009}
2010
2011fn component_specs(
2012    manifest: Option<&greentic_types::PackManifest>,
2013    legacy_manifest: Option<&legacy_pack::PackManifest>,
2014    component_sources: Option<&ComponentSourcesV1>,
2015    pack_lock: Option<&PackLockV1>,
2016) -> Vec<ComponentSpec> {
2017    if let Some(manifest) = manifest {
2018        if !manifest.components.is_empty() {
2019            return manifest
2020                .components
2021                .iter()
2022                .map(|entry| ComponentSpec {
2023                    id: entry.id.as_str().to_string(),
2024                    version: entry.version.to_string(),
2025                    legacy_path: None,
2026                })
2027                .collect();
2028        }
2029        if let Some(lock) = pack_lock {
2030            let mut seen = HashSet::new();
2031            let mut specs = Vec::new();
2032            for entry in &lock.components {
2033                let id = entry
2034                    .component_id
2035                    .as_ref()
2036                    .map(|id| id.as_str())
2037                    .unwrap_or(entry.name.as_str());
2038                if seen.insert(id.to_string()) {
2039                    specs.push(ComponentSpec {
2040                        id: id.to_string(),
2041                        version: "0.0.0".to_string(),
2042                        legacy_path: None,
2043                    });
2044                }
2045            }
2046            return specs;
2047        }
2048        if let Some(sources) = component_sources {
2049            let mut seen = HashSet::new();
2050            let mut specs = Vec::new();
2051            for entry in &sources.components {
2052                let id = entry
2053                    .component_id
2054                    .as_ref()
2055                    .map(|id| id.as_str())
2056                    .unwrap_or(entry.name.as_str());
2057                if seen.insert(id.to_string()) {
2058                    specs.push(ComponentSpec {
2059                        id: id.to_string(),
2060                        version: "0.0.0".to_string(),
2061                        legacy_path: None,
2062                    });
2063                }
2064            }
2065            return specs;
2066        }
2067    }
2068    if let Some(legacy_manifest) = legacy_manifest {
2069        return legacy_manifest
2070            .components
2071            .iter()
2072            .map(|entry| ComponentSpec {
2073                id: entry.name.clone(),
2074                version: entry.version.to_string(),
2075                legacy_path: Some(entry.file_wasm.clone()),
2076            })
2077            .collect();
2078    }
2079    Vec::new()
2080}
2081
2082fn component_sources_table(
2083    sources: Option<&ComponentSourcesV1>,
2084) -> Result<Option<HashMap<String, ComponentSourceInfo>>> {
2085    let Some(sources) = sources else {
2086        return Ok(None);
2087    };
2088    let mut table = HashMap::new();
2089    for entry in &sources.components {
2090        let artifact = match &entry.artifact {
2091            ArtifactLocationV1::Inline { wasm_path, .. } => ComponentArtifactLocation::Inline {
2092                wasm_path: wasm_path.clone(),
2093            },
2094            ArtifactLocationV1::Remote => ComponentArtifactLocation::Remote,
2095        };
2096        let info = ComponentSourceInfo {
2097            digest: Some(entry.resolved.digest.clone()),
2098            source: entry.source.clone(),
2099            artifact,
2100            expected_wasm_sha256: None,
2101            skip_digest_verification: false,
2102        };
2103        if let Some(component_id) = entry.component_id.as_ref() {
2104            table.insert(component_id.as_str().to_string(), info.clone());
2105        }
2106        table.insert(entry.name.clone(), info);
2107    }
2108    Ok(Some(table))
2109}
2110
2111fn load_pack_lock(path: &Path) -> Result<Option<PackLockV1>> {
2112    let lock_path = if path.is_dir() {
2113        let candidate = path.join("pack.lock");
2114        if candidate.exists() {
2115            Some(candidate)
2116        } else {
2117            let candidate = path.join("pack.lock.json");
2118            candidate.exists().then_some(candidate)
2119        }
2120    } else {
2121        None
2122    };
2123    let Some(lock_path) = lock_path else {
2124        return Ok(None);
2125    };
2126    let raw = std::fs::read_to_string(&lock_path)
2127        .with_context(|| format!("failed to read {}", lock_path.display()))?;
2128    let lock: PackLockV1 = serde_json::from_str(&raw).context("failed to parse pack.lock")?;
2129    if lock.schema_version != 1 {
2130        bail!("pack.lock schema_version must be 1");
2131    }
2132    Ok(Some(lock))
2133}
2134
2135fn find_pack_lock_roots(
2136    pack_path: &Path,
2137    is_dir: bool,
2138    archive_hint: Option<&Path>,
2139) -> Vec<PathBuf> {
2140    if is_dir {
2141        return vec![pack_path.to_path_buf()];
2142    }
2143    let mut roots = Vec::new();
2144    if let Some(archive_path) = archive_hint {
2145        if let Some(parent) = archive_path.parent() {
2146            roots.push(parent.to_path_buf());
2147            if let Some(grandparent) = parent.parent() {
2148                roots.push(grandparent.to_path_buf());
2149            }
2150        }
2151    } else if let Some(parent) = pack_path.parent() {
2152        roots.push(parent.to_path_buf());
2153        if let Some(grandparent) = parent.parent() {
2154            roots.push(grandparent.to_path_buf());
2155        }
2156    }
2157    roots
2158}
2159
2160fn normalize_sha256(digest: &str) -> Result<String> {
2161    let trimmed = digest.trim();
2162    if trimmed.is_empty() {
2163        bail!("sha256 digest cannot be empty");
2164    }
2165    if let Some(stripped) = trimmed.strip_prefix("sha256:") {
2166        if stripped.is_empty() {
2167            bail!("sha256 digest must include hex bytes after sha256:");
2168        }
2169        return Ok(trimmed.to_string());
2170    }
2171    if trimmed.chars().all(|c| c.is_ascii_hexdigit()) {
2172        return Ok(format!("sha256:{trimmed}"));
2173    }
2174    bail!("sha256 digest must be hex or sha256:<hex>");
2175}
2176
2177fn component_sources_table_from_pack_lock(
2178    lock: &PackLockV1,
2179    allow_missing_hash: bool,
2180) -> Result<HashMap<String, ComponentSourceInfo>> {
2181    let mut table = HashMap::new();
2182    let mut names = HashSet::new();
2183    for entry in &lock.components {
2184        if !names.insert(entry.name.clone()) {
2185            bail!(
2186                "pack.lock contains duplicate component name `{}`",
2187                entry.name
2188            );
2189        }
2190        let source_ref = match (&entry.source_ref, &entry.legacy_ref) {
2191            (Some(primary), Some(legacy)) => {
2192                if primary != legacy {
2193                    bail!(
2194                        "pack.lock component {} has conflicting refs: {} vs {}",
2195                        entry.name,
2196                        primary,
2197                        legacy
2198                    );
2199                }
2200                primary.as_str()
2201            }
2202            (Some(primary), None) => primary.as_str(),
2203            (None, Some(legacy)) => legacy.as_str(),
2204            (None, None) => {
2205                bail!("pack.lock component {} missing source_ref", entry.name);
2206            }
2207        };
2208        let source: ComponentSourceRef = source_ref
2209            .parse()
2210            .with_context(|| format!("invalid component ref `{}`", source_ref))?;
2211        let bundled_path = match (&entry.bundled_path, &entry.legacy_path) {
2212            (Some(primary), Some(legacy)) => {
2213                if primary != legacy {
2214                    bail!(
2215                        "pack.lock component {} has conflicting bundled paths: {} vs {}",
2216                        entry.name,
2217                        primary,
2218                        legacy
2219                    );
2220                }
2221                Some(primary.clone())
2222            }
2223            (Some(primary), None) => Some(primary.clone()),
2224            (None, Some(legacy)) => Some(legacy.clone()),
2225            (None, None) => None,
2226        };
2227        let bundled = entry.bundled.unwrap_or(false) || bundled_path.is_some();
2228        let (artifact, digest, expected_wasm_sha256, skip_digest_verification) = if bundled {
2229            let wasm_path = bundled_path.ok_or_else(|| {
2230                anyhow!(
2231                    "pack.lock component {} marked bundled but bundled_path is missing",
2232                    entry.name
2233                )
2234            })?;
2235            let expected_raw = match (&entry.wasm_sha256, &entry.legacy_sha256) {
2236                (Some(primary), Some(legacy)) => {
2237                    if primary != legacy {
2238                        bail!(
2239                            "pack.lock component {} has conflicting wasm_sha256 values: {} vs {}",
2240                            entry.name,
2241                            primary,
2242                            legacy
2243                        );
2244                    }
2245                    Some(primary.as_str())
2246                }
2247                (Some(primary), None) => Some(primary.as_str()),
2248                (None, Some(legacy)) => Some(legacy.as_str()),
2249                (None, None) => None,
2250            };
2251            let expected = match expected_raw {
2252                Some(value) => Some(normalize_sha256(value)?),
2253                None => None,
2254            };
2255            if expected.is_none() && !allow_missing_hash {
2256                bail!(
2257                    "pack.lock component {} missing wasm_sha256 for bundled component",
2258                    entry.name
2259                );
2260            }
2261            (
2262                ComponentArtifactLocation::Inline { wasm_path },
2263                expected.clone(),
2264                expected,
2265                allow_missing_hash && expected_raw.is_none(),
2266            )
2267        } else {
2268            if source.is_tag() {
2269                bail!(
2270                    "component {} uses tag ref {} but is not bundled; rebuild the pack",
2271                    entry.name,
2272                    source
2273                );
2274            }
2275            let expected = entry
2276                .resolved_digest
2277                .as_deref()
2278                .or(entry.digest.as_deref())
2279                .ok_or_else(|| {
2280                    anyhow!(
2281                        "pack.lock component {} missing resolved_digest for remote component",
2282                        entry.name
2283                    )
2284                })?;
2285            (
2286                ComponentArtifactLocation::Remote,
2287                Some(normalize_digest(expected)),
2288                None,
2289                false,
2290            )
2291        };
2292        let info = ComponentSourceInfo {
2293            digest,
2294            source,
2295            artifact,
2296            expected_wasm_sha256,
2297            skip_digest_verification,
2298        };
2299        if let Some(component_id) = entry.component_id.as_ref() {
2300            let key = component_id.as_str().to_string();
2301            if table.contains_key(&key) {
2302                bail!(
2303                    "pack.lock contains duplicate component id `{}`",
2304                    component_id.as_str()
2305                );
2306            }
2307            table.insert(key, info.clone());
2308        }
2309        if entry.name
2310            != entry
2311                .component_id
2312                .as_ref()
2313                .map(|id| id.as_str())
2314                .unwrap_or("")
2315        {
2316            table.insert(entry.name.clone(), info);
2317        }
2318    }
2319    Ok(table)
2320}
2321
2322fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
2323    if let Some(path) = &spec.legacy_path {
2324        return root.join(path);
2325    }
2326    root.join("components").join(format!("{}.wasm", spec.id))
2327}
2328
2329fn normalize_digest(digest: &str) -> String {
2330    if digest.starts_with("sha256:") || digest.starts_with("blake3:") {
2331        digest.to_string()
2332    } else {
2333        format!("sha256:{digest}")
2334    }
2335}
2336
2337fn compute_digest_for(bytes: &[u8], digest: &str) -> Result<String> {
2338    if digest.starts_with("blake3:") {
2339        let hash = blake3::hash(bytes);
2340        return Ok(format!("blake3:{}", hash.to_hex()));
2341    }
2342    let mut hasher = sha2::Sha256::new();
2343    hasher.update(bytes);
2344    Ok(format!("sha256:{:x}", hasher.finalize()))
2345}
2346
2347fn compute_sha256_digest_for(bytes: &[u8]) -> String {
2348    let mut hasher = sha2::Sha256::new();
2349    hasher.update(bytes);
2350    format!("sha256:{:x}", hasher.finalize())
2351}
2352
2353fn build_artifact_key(cache: &CacheManager, digest: Option<&str>, bytes: &[u8]) -> ArtifactKey {
2354    let wasm_digest = digest
2355        .map(normalize_digest)
2356        .unwrap_or_else(|| compute_sha256_digest_for(bytes));
2357    ArtifactKey::new(cache.engine_profile_id().to_string(), wasm_digest)
2358}
2359
2360async fn compile_component_with_cache(
2361    cache: &CacheManager,
2362    engine: &Engine,
2363    digest: Option<&str>,
2364    bytes: Vec<u8>,
2365) -> Result<Arc<Component>> {
2366    let key = build_artifact_key(cache, digest, &bytes);
2367    cache.get_component(engine, &key, || Ok(bytes)).await
2368}
2369
2370fn verify_component_digest(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2371    let normalized_expected = normalize_digest(expected);
2372    let actual = compute_digest_for(bytes, &normalized_expected)?;
2373    if normalize_digest(&actual) != normalized_expected {
2374        bail!(
2375            "component {component_id} digest mismatch: expected {normalized_expected}, got {actual}"
2376        );
2377    }
2378    Ok(())
2379}
2380
2381fn verify_wasm_sha256(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2382    let normalized_expected = normalize_sha256(expected)?;
2383    let actual = compute_sha256_digest_for(bytes);
2384    if actual != normalized_expected {
2385        bail!(
2386            "component {component_id} bundled digest mismatch: expected {normalized_expected}, got {actual}"
2387        );
2388    }
2389    Ok(())
2390}
2391
2392#[cfg(test)]
2393mod pack_lock_tests {
2394    use super::*;
2395    use tempfile::TempDir;
2396
2397    #[test]
2398    fn pack_lock_tag_ref_requires_bundle() {
2399        let lock = PackLockV1 {
2400            schema_version: 1,
2401            components: vec![PackLockComponent {
2402                name: "templates".to_string(),
2403                source_ref: Some("oci://registry.test/templates:latest".to_string()),
2404                legacy_ref: None,
2405                component_id: None,
2406                bundled: Some(false),
2407                bundled_path: None,
2408                legacy_path: None,
2409                wasm_sha256: None,
2410                legacy_sha256: None,
2411                resolved_digest: None,
2412                digest: None,
2413            }],
2414        };
2415        let err = component_sources_table_from_pack_lock(&lock, false).unwrap_err();
2416        assert!(
2417            err.to_string().contains("tag ref") && err.to_string().contains("rebuild the pack"),
2418            "unexpected error: {err}"
2419        );
2420    }
2421
2422    #[test]
2423    fn bundled_hash_mismatch_errors() {
2424        let rt = tokio::runtime::Runtime::new().expect("runtime");
2425        let temp = TempDir::new().expect("temp dir");
2426        let engine = Engine::default();
2427        let engine_profile =
2428            EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
2429        let cache_config = CacheConfig {
2430            root: temp.path().join("cache"),
2431            ..CacheConfig::default()
2432        };
2433        let cache = CacheManager::new(cache_config, engine_profile);
2434        let wasm_path = temp.path().join("component.wasm");
2435        let fixture_wasm = Path::new(env!("CARGO_MANIFEST_DIR"))
2436            .join("../../tests/fixtures/packs/secrets_store_smoke/components/echo_secret.wasm");
2437        let bytes = std::fs::read(&fixture_wasm).expect("read fixture wasm");
2438        std::fs::write(&wasm_path, &bytes).expect("write temp wasm");
2439
2440        let spec = ComponentSpec {
2441            id: "qa.process".to_string(),
2442            version: "0.0.0".to_string(),
2443            legacy_path: None,
2444        };
2445        let mut missing = HashSet::new();
2446        missing.insert(spec.id.clone());
2447
2448        let mut sources = HashMap::new();
2449        sources.insert(
2450            spec.id.clone(),
2451            ComponentSourceInfo {
2452                digest: Some("sha256:deadbeef".to_string()),
2453                source: ComponentSourceRef::Oci("registry.test/qa.process@sha256:deadbeef".into()),
2454                artifact: ComponentArtifactLocation::Inline {
2455                    wasm_path: "component.wasm".to_string(),
2456                },
2457                expected_wasm_sha256: Some("sha256:deadbeef".to_string()),
2458                skip_digest_verification: false,
2459            },
2460        );
2461
2462        let mut loaded = HashMap::new();
2463        let result = rt.block_on(load_components_from_sources(
2464            &cache,
2465            &engine,
2466            &sources,
2467            &ComponentResolution::default(),
2468            &[spec],
2469            &mut missing,
2470            &mut loaded,
2471            Some(temp.path()),
2472            None,
2473        ));
2474        let err = result.unwrap_err();
2475        assert!(
2476            err.to_string().contains("bundled digest mismatch"),
2477            "unexpected error: {err}"
2478        );
2479    }
2480}
2481
2482fn dist_options_from(component_resolution: &ComponentResolution) -> DistOptions {
2483    let mut opts = DistOptions {
2484        allow_tags: true,
2485        ..DistOptions::default()
2486    };
2487    if let Some(cache_dir) = component_resolution.dist_cache_dir.clone() {
2488        opts.cache_dir = cache_dir;
2489    }
2490    if component_resolution.dist_offline {
2491        opts.offline = true;
2492    }
2493    opts
2494}
2495
2496#[allow(clippy::too_many_arguments)]
2497async fn load_components_from_sources(
2498    cache: &CacheManager,
2499    engine: &Engine,
2500    component_sources: &HashMap<String, ComponentSourceInfo>,
2501    component_resolution: &ComponentResolution,
2502    specs: &[ComponentSpec],
2503    missing: &mut HashSet<String>,
2504    into: &mut HashMap<String, PackComponent>,
2505    materialized_root: Option<&Path>,
2506    archive_hint: Option<&Path>,
2507) -> Result<()> {
2508    let mut archive = if let Some(path) = archive_hint {
2509        Some(
2510            ZipArchive::new(File::open(path)?)
2511                .with_context(|| format!("{} is not a valid gtpack", path.display()))?,
2512        )
2513    } else {
2514        None
2515    };
2516    let mut dist_client: Option<DistClient> = None;
2517
2518    for spec in specs {
2519        if !missing.contains(&spec.id) {
2520            continue;
2521        }
2522        let Some(source) = component_sources.get(&spec.id) else {
2523            continue;
2524        };
2525
2526        let bytes = match &source.artifact {
2527            ComponentArtifactLocation::Inline { wasm_path } => {
2528                if let Some(root) = materialized_root {
2529                    let path = root.join(wasm_path);
2530                    if path.exists() {
2531                        std::fs::read(&path).with_context(|| {
2532                            format!(
2533                                "failed to read inline component {} from {}",
2534                                spec.id,
2535                                path.display()
2536                            )
2537                        })?
2538                    } else if archive.is_none() {
2539                        bail!("inline component {} missing at {}", spec.id, path.display());
2540                    } else {
2541                        read_entry(
2542                            archive.as_mut().expect("archive present when needed"),
2543                            wasm_path,
2544                        )
2545                        .with_context(|| {
2546                            format!(
2547                                "inline component {} missing at {} in pack archive",
2548                                spec.id, wasm_path
2549                            )
2550                        })?
2551                    }
2552                } else if let Some(archive) = archive.as_mut() {
2553                    read_entry(archive, wasm_path).with_context(|| {
2554                        format!(
2555                            "inline component {} missing at {} in pack archive",
2556                            spec.id, wasm_path
2557                        )
2558                    })?
2559                } else {
2560                    bail!(
2561                        "inline component {} missing and no pack source available",
2562                        spec.id
2563                    );
2564                }
2565            }
2566            ComponentArtifactLocation::Remote => {
2567                if source.source.is_tag() {
2568                    bail!(
2569                        "component {} uses tag ref {} but is not bundled; rebuild the pack",
2570                        spec.id,
2571                        source.source
2572                    );
2573                }
2574                let client = dist_client.get_or_insert_with(|| {
2575                    DistClient::new(dist_options_from(component_resolution))
2576                });
2577                let reference = source.source.to_string();
2578                let digest = source.digest.as_deref().ok_or_else(|| {
2579                    anyhow!(
2580                        "component {} missing expected digest for remote component",
2581                        spec.id
2582                    )
2583                })?;
2584                let cache_path = if component_resolution.dist_offline {
2585                    client
2586                        .fetch_digest(digest)
2587                        .await
2588                        .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?
2589                } else {
2590                    let resolved = client
2591                        .resolve_ref(&reference)
2592                        .await
2593                        .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
2594                    let expected = normalize_digest(digest);
2595                    let actual = normalize_digest(&resolved.digest);
2596                    if expected != actual {
2597                        bail!(
2598                            "component {} digest mismatch after fetch: expected {}, got {}",
2599                            spec.id,
2600                            expected,
2601                            actual
2602                        );
2603                    }
2604                    resolved.cache_path.ok_or_else(|| {
2605                        anyhow!(
2606                            "component {} resolved from {} but cache path is missing",
2607                            spec.id,
2608                            reference
2609                        )
2610                    })?
2611                };
2612                std::fs::read(&cache_path).with_context(|| {
2613                    format!(
2614                        "failed to read cached component {} from {}",
2615                        spec.id,
2616                        cache_path.display()
2617                    )
2618                })?
2619            }
2620        };
2621
2622        if let Some(expected) = source.expected_wasm_sha256.as_deref() {
2623            verify_wasm_sha256(&spec.id, expected, &bytes)?;
2624        } else if source.skip_digest_verification {
2625            let actual = compute_sha256_digest_for(&bytes);
2626            warn!(
2627                component_id = %spec.id,
2628                digest = %actual,
2629                "bundled component missing wasm_sha256; allowing due to flag"
2630            );
2631        } else {
2632            let expected = source.digest.as_deref().ok_or_else(|| {
2633                anyhow!(
2634                    "component {} missing expected digest for verification",
2635                    spec.id
2636                )
2637            })?;
2638            verify_component_digest(&spec.id, expected, &bytes)?;
2639        }
2640        let component =
2641            compile_component_with_cache(cache, engine, source.digest.as_deref(), bytes)
2642                .await
2643                .with_context(|| format!("failed to compile component {}", spec.id))?;
2644        into.insert(
2645            spec.id.clone(),
2646            PackComponent {
2647                name: spec.id.clone(),
2648                version: spec.version.clone(),
2649                component,
2650            },
2651        );
2652        missing.remove(&spec.id);
2653    }
2654
2655    Ok(())
2656}
2657
2658fn dist_error_for_component(err: DistError, component_id: &str, reference: &str) -> anyhow::Error {
2659    match err {
2660        DistError::CacheMiss { reference: missing } => anyhow!(
2661            "remote component {} is not cached for {}. Run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2662            component_id,
2663            missing,
2664            reference
2665        ),
2666        DistError::Offline { reference: blocked } => anyhow!(
2667            "offline mode blocked fetching component {} from {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2668            component_id,
2669            blocked,
2670            reference
2671        ),
2672        DistError::AuthRequired { target } => anyhow!(
2673            "component {} requires authenticated source {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2674            component_id,
2675            target,
2676            reference
2677        ),
2678        other => anyhow!(
2679            "failed to resolve component {} from {}: {}",
2680            component_id,
2681            reference,
2682            other
2683        ),
2684    }
2685}
2686
2687async fn load_components_from_overrides(
2688    cache: &CacheManager,
2689    engine: &Engine,
2690    overrides: &HashMap<String, PathBuf>,
2691    specs: &[ComponentSpec],
2692    missing: &mut HashSet<String>,
2693    into: &mut HashMap<String, PackComponent>,
2694) -> Result<()> {
2695    for spec in specs {
2696        if !missing.contains(&spec.id) {
2697            continue;
2698        }
2699        let Some(path) = overrides.get(&spec.id) else {
2700            continue;
2701        };
2702        let bytes = std::fs::read(path)
2703            .with_context(|| format!("failed to read override component {}", path.display()))?;
2704        let component = compile_component_with_cache(cache, engine, None, bytes)
2705            .await
2706            .with_context(|| {
2707                format!(
2708                    "failed to compile component {} from override {}",
2709                    spec.id,
2710                    path.display()
2711                )
2712            })?;
2713        into.insert(
2714            spec.id.clone(),
2715            PackComponent {
2716                name: spec.id.clone(),
2717                version: spec.version.clone(),
2718                component,
2719            },
2720        );
2721        missing.remove(&spec.id);
2722    }
2723    Ok(())
2724}
2725
2726async fn load_components_from_dir(
2727    cache: &CacheManager,
2728    engine: &Engine,
2729    root: &Path,
2730    specs: &[ComponentSpec],
2731    missing: &mut HashSet<String>,
2732    into: &mut HashMap<String, PackComponent>,
2733) -> Result<()> {
2734    for spec in specs {
2735        if !missing.contains(&spec.id) {
2736            continue;
2737        }
2738        let path = component_path_for_spec(root, spec);
2739        if !path.exists() {
2740            tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
2741            continue;
2742        }
2743        let bytes = std::fs::read(&path)
2744            .with_context(|| format!("failed to read component {}", path.display()))?;
2745        let component = compile_component_with_cache(cache, engine, None, bytes)
2746            .await
2747            .with_context(|| {
2748                format!(
2749                    "failed to compile component {} from {}",
2750                    spec.id,
2751                    path.display()
2752                )
2753            })?;
2754        into.insert(
2755            spec.id.clone(),
2756            PackComponent {
2757                name: spec.id.clone(),
2758                version: spec.version.clone(),
2759                component,
2760            },
2761        );
2762        missing.remove(&spec.id);
2763    }
2764    Ok(())
2765}
2766
2767async fn load_components_from_archive(
2768    cache: &CacheManager,
2769    engine: &Engine,
2770    path: &Path,
2771    specs: &[ComponentSpec],
2772    missing: &mut HashSet<String>,
2773    into: &mut HashMap<String, PackComponent>,
2774) -> Result<()> {
2775    let mut archive = ZipArchive::new(File::open(path)?)
2776        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
2777    for spec in specs {
2778        if !missing.contains(&spec.id) {
2779            continue;
2780        }
2781        let file_name = spec
2782            .legacy_path
2783            .clone()
2784            .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
2785        let bytes = match read_entry(&mut archive, &file_name) {
2786            Ok(bytes) => bytes,
2787            Err(err) => {
2788                warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
2789                continue;
2790            }
2791        };
2792        let component = compile_component_with_cache(cache, engine, None, bytes)
2793            .await
2794            .with_context(|| format!("failed to compile component {}", spec.id))?;
2795        into.insert(
2796            spec.id.clone(),
2797            PackComponent {
2798                name: spec.id.clone(),
2799                version: spec.version.clone(),
2800                component,
2801            },
2802        );
2803        missing.remove(&spec.id);
2804    }
2805    Ok(())
2806}
2807
2808#[cfg(test)]
2809mod tests {
2810    use super::*;
2811    use greentic_flow::model::{FlowDoc, NodeDoc};
2812    use indexmap::IndexMap;
2813    use serde_json::json;
2814
2815    #[test]
2816    fn normalizes_raw_component_to_component_exec() {
2817        let mut nodes = IndexMap::new();
2818        let mut raw = IndexMap::new();
2819        raw.insert(
2820            "templating.handlebars".into(),
2821            json!({ "template": "Hi {{name}}" }),
2822        );
2823        nodes.insert(
2824            "start".into(),
2825            NodeDoc {
2826                raw,
2827                routing: json!([{"out": true}]),
2828                ..Default::default()
2829            },
2830        );
2831        let doc = FlowDoc {
2832            id: "welcome".into(),
2833            title: None,
2834            description: None,
2835            flow_type: "messaging".into(),
2836            start: Some("start".into()),
2837            parameters: json!({}),
2838            tags: Vec::new(),
2839            schema_version: None,
2840            entrypoints: IndexMap::new(),
2841            nodes,
2842        };
2843
2844        let normalized = normalize_flow_doc(doc);
2845        let node = normalized.nodes.get("start").expect("node exists");
2846        assert_eq!(node.operation.as_deref(), Some("component.exec"));
2847        assert!(node.raw.is_empty());
2848        let payload = node.payload.as_object().expect("payload object");
2849        assert_eq!(
2850            payload.get("component"),
2851            Some(&Value::String("templating.handlebars".into()))
2852        );
2853        assert_eq!(
2854            payload.get("operation"),
2855            Some(&Value::String("render".into()))
2856        );
2857        let input = payload.get("input").unwrap();
2858        assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
2859    }
2860}
2861
2862#[derive(Clone, Debug, Default, Serialize, Deserialize)]
2863pub struct PackMetadata {
2864    pub pack_id: String,
2865    pub version: String,
2866    #[serde(default)]
2867    pub entry_flows: Vec<String>,
2868    #[serde(default)]
2869    pub secret_requirements: Vec<greentic_types::SecretRequirement>,
2870}
2871
2872impl PackMetadata {
2873    fn from_wasm(bytes: &[u8]) -> Option<Self> {
2874        let parser = Parser::new(0);
2875        for payload in parser.parse_all(bytes) {
2876            let payload = payload.ok()?;
2877            match payload {
2878                Payload::CustomSection(section) => {
2879                    if section.name() == "greentic.manifest"
2880                        && let Ok(meta) = Self::from_bytes(section.data())
2881                    {
2882                        return Some(meta);
2883                    }
2884                }
2885                Payload::DataSection(reader) => {
2886                    for segment in reader.into_iter().flatten() {
2887                        if let Ok(meta) = Self::from_bytes(segment.data) {
2888                            return Some(meta);
2889                        }
2890                    }
2891                }
2892                _ => {}
2893            }
2894        }
2895        None
2896    }
2897
2898    fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
2899        #[derive(Deserialize)]
2900        struct RawManifest {
2901            pack_id: String,
2902            version: String,
2903            #[serde(default)]
2904            entry_flows: Vec<String>,
2905            #[serde(default)]
2906            flows: Vec<RawFlow>,
2907            #[serde(default)]
2908            secret_requirements: Vec<greentic_types::SecretRequirement>,
2909        }
2910
2911        #[derive(Deserialize)]
2912        struct RawFlow {
2913            id: String,
2914        }
2915
2916        let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
2917        let mut entry_flows = if manifest.entry_flows.is_empty() {
2918            manifest.flows.iter().map(|f| f.id.clone()).collect()
2919        } else {
2920            manifest.entry_flows.clone()
2921        };
2922        entry_flows.retain(|id| !id.is_empty());
2923        Ok(Self {
2924            pack_id: manifest.pack_id,
2925            version: manifest.version,
2926            entry_flows,
2927            secret_requirements: manifest.secret_requirements,
2928        })
2929    }
2930
2931    pub fn fallback(path: &Path) -> Self {
2932        let pack_id = path
2933            .file_stem()
2934            .map(|s| s.to_string_lossy().into_owned())
2935            .unwrap_or_else(|| "unknown-pack".to_string());
2936        Self {
2937            pack_id,
2938            version: "0.0.0".to_string(),
2939            entry_flows: Vec::new(),
2940            secret_requirements: Vec::new(),
2941        }
2942    }
2943
2944    pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
2945        let entry_flows = manifest
2946            .flows
2947            .iter()
2948            .map(|flow| flow.id.as_str().to_string())
2949            .collect::<Vec<_>>();
2950        Self {
2951            pack_id: manifest.pack_id.as_str().to_string(),
2952            version: manifest.version.to_string(),
2953            entry_flows,
2954            secret_requirements: manifest.secret_requirements.clone(),
2955        }
2956    }
2957}