Skip to main content

greentic_runner_host/
pack.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::component_api::{
10    self, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
11};
12use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
13use crate::provider::{ProviderBinding, ProviderRegistry};
14use crate::provider_core::SchemaCorePre as ProviderComponentPre;
15use crate::provider_core_only;
16use crate::runtime_wasmtime::{Component, Engine, InstancePre, Linker, ResourceTable};
17use anyhow::{Context, Result, anyhow, bail};
18use greentic_distributor_client::dist::{DistClient, DistError, DistOptions};
19use greentic_interfaces_wasmtime::host_helpers::v1::{
20    self as host_v1, HostFns, add_all_v1_to_linker,
21    runner_host_http::RunnerHostHttp,
22    runner_host_kv::RunnerHostKv,
23    secrets_store::{SecretsError, SecretsStoreHost},
24    state_store::{
25        OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
26        StateStoreHost, TenantCtx as StateTenantCtx,
27    },
28    telemetry_logger::{
29        OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
30        TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
31        TenantCtx as TelemetryTenantCtx,
32    },
33};
34use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
35use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
36use greentic_pack::builder as legacy_pack;
37use greentic_types::flow::FlowHasher;
38use greentic_types::{
39    ArtifactLocationV1, ComponentId, ComponentManifest, ComponentSourceRef,
40    EXT_COMPONENT_SOURCES_V1, EnvId, ExtensionRef, Flow, FlowComponentRef, FlowId, FlowKind,
41    FlowMetadata, InputMapping, Node, NodeId, OutputMapping, Routing, StateKey as StoreStateKey,
42    TeamId, TelemetryHints, TenantCtx as TypesTenantCtx, TenantId, UserId, decode_pack_manifest,
43    pack_manifest::ExtensionInline,
44};
45use host_v1::http_client::{
46    HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
47    Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
48    RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
49    TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
50};
51use indexmap::IndexMap;
52use once_cell::sync::Lazy;
53use parking_lot::{Mutex, RwLock};
54use reqwest::blocking::Client as BlockingClient;
55use runner_core::normalize_under_root;
56use serde::{Deserialize, Serialize};
57use serde_cbor;
58use serde_json::{self, Value};
59use sha2::Digest;
60use tokio::fs;
61use wasmparser::{Parser, Payload};
62use wasmtime::StoreContextMut;
63use zip::ZipArchive;
64
65use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
66use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
67use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
68
69use crate::config::HostConfig;
70use crate::secrets::{DynSecretsManager, read_secret_blocking};
71use crate::storage::state::STATE_PREFIX;
72use crate::storage::{DynSessionStore, DynStateStore};
73use crate::verify;
74use crate::wasi::RunnerWasiPolicy;
75use tracing::warn;
76use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
77use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
78
79use greentic_flow::model::FlowDoc;
80
81#[allow(dead_code)]
82pub struct PackRuntime {
83    /// Component artifact path (wasm file).
84    path: PathBuf,
85    /// Optional archive (.gtpack) used to load flows/manifests.
86    archive_path: Option<PathBuf>,
87    config: Arc<HostConfig>,
88    engine: Engine,
89    metadata: PackMetadata,
90    manifest: Option<greentic_types::PackManifest>,
91    legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
92    component_manifests: HashMap<String, ComponentManifest>,
93    mocks: Option<Arc<MockLayer>>,
94    flows: Option<PackFlows>,
95    components: HashMap<String, PackComponent>,
96    http_client: Arc<BlockingClient>,
97    pre_cache: Mutex<HashMap<String, InstancePre<ComponentState>>>,
98    session_store: Option<DynSessionStore>,
99    state_store: Option<DynStateStore>,
100    wasi_policy: Arc<RunnerWasiPolicy>,
101    provider_registry: RwLock<Option<ProviderRegistry>>,
102    secrets: DynSecretsManager,
103    oauth_config: Option<OAuthBrokerConfig>,
104}
105
106struct PackComponent {
107    #[allow(dead_code)]
108    name: String,
109    #[allow(dead_code)]
110    version: String,
111    component: Component,
112}
113
114#[derive(Debug, Default, Clone)]
115pub struct ComponentResolution {
116    /// Root of a materialized pack directory containing `manifest.cbor` and `components/`.
117    pub materialized_root: Option<PathBuf>,
118    /// Explicit overrides mapping component id -> wasm path.
119    pub overrides: HashMap<String, PathBuf>,
120    /// If true, do not fetch remote components; require cached artifacts.
121    pub dist_offline: bool,
122    /// Optional cache directory for resolved remote components.
123    pub dist_cache_dir: Option<PathBuf>,
124}
125
126fn build_blocking_client() -> BlockingClient {
127    std::thread::spawn(|| {
128        BlockingClient::builder()
129            .no_proxy()
130            .build()
131            .expect("blocking client")
132    })
133    .join()
134    .expect("client build thread panicked")
135}
136
137fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
138    let (root, candidate) = if path.is_absolute() {
139        let parent = path
140            .parent()
141            .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
142        let root = parent
143            .canonicalize()
144            .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
145        let file = path
146            .file_name()
147            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
148        (root, PathBuf::from(file))
149    } else {
150        let cwd = std::env::current_dir().context("failed to resolve current directory")?;
151        let base = if let Some(parent) = path.parent() {
152            cwd.join(parent)
153        } else {
154            cwd
155        };
156        let root = base
157            .canonicalize()
158            .with_context(|| format!("failed to canonicalize {}", base.display()))?;
159        let file = path
160            .file_name()
161            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
162        (root, PathBuf::from(file))
163    };
164    let safe = normalize_under_root(&root, &candidate)?;
165    Ok((root, safe))
166}
167
168static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
169
170#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct FlowDescriptor {
172    pub id: String,
173    #[serde(rename = "type")]
174    pub flow_type: String,
175    pub profile: String,
176    pub version: String,
177    #[serde(default)]
178    pub description: Option<String>,
179}
180
181pub struct HostState {
182    config: Arc<HostConfig>,
183    http_client: Arc<BlockingClient>,
184    default_env: String,
185    #[allow(dead_code)]
186    session_store: Option<DynSessionStore>,
187    state_store: Option<DynStateStore>,
188    mocks: Option<Arc<MockLayer>>,
189    secrets: DynSecretsManager,
190    oauth_config: Option<OAuthBrokerConfig>,
191    oauth_host: OAuthBrokerHost,
192    exec_ctx: Option<ComponentExecCtx>,
193}
194
195impl HostState {
196    #[allow(clippy::default_constructed_unit_structs)]
197    #[allow(clippy::too_many_arguments)]
198    pub fn new(
199        config: Arc<HostConfig>,
200        http_client: Arc<BlockingClient>,
201        mocks: Option<Arc<MockLayer>>,
202        session_store: Option<DynSessionStore>,
203        state_store: Option<DynStateStore>,
204        secrets: DynSecretsManager,
205        oauth_config: Option<OAuthBrokerConfig>,
206        exec_ctx: Option<ComponentExecCtx>,
207    ) -> Result<Self> {
208        let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
209        Ok(Self {
210            config,
211            http_client,
212            default_env,
213            session_store,
214            state_store,
215            mocks,
216            secrets,
217            oauth_config,
218            oauth_host: OAuthBrokerHost::default(),
219            exec_ctx,
220        })
221    }
222
223    pub fn get_secret(&self, key: &str) -> Result<String> {
224        if provider_core_only::is_enabled() {
225            bail!(provider_core_only::blocked_message("secrets"))
226        }
227        if !self.config.secrets_policy.is_allowed(key) {
228            bail!("secret {key} is not permitted by bindings policy");
229        }
230        if let Some(mock) = &self.mocks
231            && let Some(value) = mock.secrets_lookup(key)
232        {
233            return Ok(value);
234        }
235        let bytes = read_secret_blocking(&self.secrets, key)
236            .context("failed to read secret from manager")?;
237        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
238        Ok(value)
239    }
240
241    fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
242        let tenant_raw = ctx
243            .as_ref()
244            .map(|ctx| ctx.tenant.clone())
245            .or_else(|| self.exec_ctx.as_ref().map(|ctx| ctx.tenant.tenant.clone()))
246            .unwrap_or_else(|| self.config.tenant.clone());
247        let env_raw = ctx
248            .as_ref()
249            .map(|ctx| ctx.env.clone())
250            .unwrap_or_else(|| self.default_env.clone());
251        let tenant_id = TenantId::from_str(&tenant_raw)
252            .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
253        let env_id = EnvId::from_str(&env_raw)
254            .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
255        let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
256        if let Some(exec_ctx) = self.exec_ctx.as_ref() {
257            if let Some(team) = exec_ctx.tenant.team.as_ref() {
258                let team_id =
259                    TeamId::from_str(team).with_context(|| format!("invalid team id `{team}`"))?;
260                tenant_ctx = tenant_ctx.with_team(Some(team_id));
261            }
262            if let Some(user) = exec_ctx.tenant.user.as_ref() {
263                let user_id =
264                    UserId::from_str(user).with_context(|| format!("invalid user id `{user}`"))?;
265                tenant_ctx = tenant_ctx.with_user(Some(user_id));
266            }
267            tenant_ctx = tenant_ctx.with_flow(exec_ctx.flow_id.clone());
268            if let Some(node) = exec_ctx.node_id.as_ref() {
269                tenant_ctx = tenant_ctx.with_node(node.clone());
270            }
271            if let Some(session) = exec_ctx.tenant.correlation_id.as_ref() {
272                tenant_ctx = tenant_ctx.with_session(session.clone());
273            }
274            tenant_ctx.trace_id = exec_ctx.tenant.trace_id.clone();
275        }
276
277        if let Some(ctx) = ctx {
278            if let Some(team) = ctx.team.or(ctx.team_id) {
279                let team_id =
280                    TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
281                tenant_ctx = tenant_ctx.with_team(Some(team_id));
282            }
283            if let Some(user) = ctx.user.or(ctx.user_id) {
284                let user_id =
285                    UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
286                tenant_ctx = tenant_ctx.with_user(Some(user_id));
287            }
288            if let Some(flow) = ctx.flow_id {
289                tenant_ctx = tenant_ctx.with_flow(flow);
290            }
291            if let Some(node) = ctx.node_id {
292                tenant_ctx = tenant_ctx.with_node(node);
293            }
294            if let Some(provider) = ctx.provider_id {
295                tenant_ctx = tenant_ctx.with_provider(provider);
296            }
297            if let Some(session) = ctx.session_id {
298                tenant_ctx = tenant_ctx.with_session(session);
299            }
300            tenant_ctx.trace_id = ctx.trace_id;
301        }
302        Ok(tenant_ctx)
303    }
304
305    fn send_http_request(
306        &mut self,
307        req: HttpRequest,
308        opts: Option<HttpRequestOptionsV1_1>,
309        _ctx: Option<HttpTenantCtx>,
310    ) -> Result<HttpResponse, HttpClientError> {
311        if !self.config.http_enabled {
312            return Err(HttpClientError {
313                code: "denied".into(),
314                message: "http client disabled by policy".into(),
315            });
316        }
317
318        let mut mock_state = None;
319        let raw_body = req.body.clone();
320        if let Some(mock) = &self.mocks
321            && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
322        {
323            match mock.http_begin(&meta) {
324                HttpDecision::Mock(response) => {
325                    let headers = response
326                        .headers
327                        .iter()
328                        .map(|(k, v)| (k.clone(), v.clone()))
329                        .collect();
330                    return Ok(HttpResponse {
331                        status: response.status,
332                        headers,
333                        body: response.body.clone().map(|b| b.into_bytes()),
334                    });
335                }
336                HttpDecision::Deny(reason) => {
337                    return Err(HttpClientError {
338                        code: "denied".into(),
339                        message: reason,
340                    });
341                }
342                HttpDecision::Passthrough { record } => {
343                    mock_state = Some((meta, record));
344                }
345            }
346        }
347
348        let method = req.method.parse().unwrap_or(reqwest::Method::GET);
349        let mut builder = self.http_client.request(method, &req.url);
350        for (key, value) in req.headers {
351            if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
352                && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
353            {
354                builder = builder.header(header, header_value);
355            }
356        }
357
358        if let Some(body) = raw_body.clone() {
359            builder = builder.body(body);
360        }
361
362        if let Some(opts) = opts {
363            if let Some(timeout_ms) = opts.timeout_ms {
364                builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
365            }
366            if opts.allow_insecure == Some(true) {
367                warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
368            }
369            if let Some(follow_redirects) = opts.follow_redirects
370                && !follow_redirects
371            {
372                warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
373            }
374        }
375
376        let response = match builder.send() {
377            Ok(resp) => resp,
378            Err(err) => {
379                warn!(url = %req.url, error = %err, "http client request failed");
380                return Err(HttpClientError {
381                    code: "unavailable".into(),
382                    message: err.to_string(),
383                });
384            }
385        };
386
387        let status = response.status().as_u16();
388        let headers_vec = response
389            .headers()
390            .iter()
391            .map(|(k, v)| {
392                (
393                    k.as_str().to_string(),
394                    v.to_str().unwrap_or_default().to_string(),
395                )
396            })
397            .collect::<Vec<_>>();
398        let body_bytes = response.bytes().ok().map(|b| b.to_vec());
399
400        if let Some((meta, true)) = mock_state.take()
401            && let Some(mock) = &self.mocks
402        {
403            let recorded = HttpMockResponse::new(
404                status,
405                headers_vec.clone().into_iter().collect(),
406                body_bytes
407                    .as_ref()
408                    .map(|b| String::from_utf8_lossy(b).into_owned()),
409            );
410            mock.http_record(&meta, &recorded);
411        }
412
413        Ok(HttpResponse {
414            status,
415            headers: headers_vec,
416            body: body_bytes,
417        })
418    }
419}
420
421impl SecretsStoreHost for HostState {
422    fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
423        if provider_core_only::is_enabled() {
424            warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
425            return Err(SecretsError::Denied);
426        }
427        if !self.config.secrets_policy.is_allowed(&key) {
428            return Err(SecretsError::Denied);
429        }
430        if let Some(mock) = &self.mocks
431            && let Some(value) = mock.secrets_lookup(&key)
432        {
433            return Ok(Some(value.into_bytes()));
434        }
435        match read_secret_blocking(&self.secrets, &key) {
436            Ok(bytes) => Ok(Some(bytes)),
437            Err(err) => {
438                warn!(secret = %key, error = %err, "secret lookup failed");
439                Err(SecretsError::NotFound)
440            }
441        }
442    }
443}
444
445impl HttpClientHost for HostState {
446    fn send(
447        &mut self,
448        req: HttpRequest,
449        ctx: Option<HttpTenantCtx>,
450    ) -> Result<HttpResponse, HttpClientError> {
451        self.send_http_request(req, None, ctx)
452    }
453}
454
455impl HttpClientHostV1_1 for HostState {
456    fn send(
457        &mut self,
458        req: HttpRequestV1_1,
459        opts: Option<HttpRequestOptionsV1_1>,
460        ctx: Option<HttpTenantCtxV1_1>,
461    ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
462        let legacy_req = HttpRequest {
463            method: req.method,
464            url: req.url,
465            headers: req.headers,
466            body: req.body,
467        };
468        let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
469            env: ctx.env,
470            tenant: ctx.tenant,
471            tenant_id: ctx.tenant_id,
472            team: ctx.team,
473            team_id: ctx.team_id,
474            user: ctx.user,
475            user_id: ctx.user_id,
476            trace_id: ctx.trace_id,
477            correlation_id: ctx.correlation_id,
478            attributes: ctx.attributes,
479            session_id: ctx.session_id,
480            flow_id: ctx.flow_id,
481            node_id: ctx.node_id,
482            provider_id: ctx.provider_id,
483            deadline_ms: ctx.deadline_ms,
484            attempt: ctx.attempt,
485            idempotency_key: ctx.idempotency_key,
486            impersonation: ctx
487                .impersonation
488                .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
489                    actor_id,
490                    reason,
491                }),
492        });
493
494        self.send_http_request(legacy_req, opts, legacy_ctx)
495            .map(|resp| HttpResponseV1_1 {
496                status: resp.status,
497                headers: resp.headers,
498                body: resp.body,
499            })
500            .map_err(|err| HttpClientErrorV1_1 {
501                code: err.code,
502                message: err.message,
503            })
504    }
505}
506
507impl StateStoreHost for HostState {
508    fn read(
509        &mut self,
510        key: HostStateKey,
511        ctx: Option<StateTenantCtx>,
512    ) -> Result<Vec<u8>, StateError> {
513        let store = match self.state_store.as_ref() {
514            Some(store) => store.clone(),
515            None => {
516                return Err(StateError {
517                    code: "unavailable".into(),
518                    message: "state store not configured".into(),
519                });
520            }
521        };
522        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
523            Ok(ctx) => ctx,
524            Err(err) => {
525                return Err(StateError {
526                    code: "invalid-ctx".into(),
527                    message: err.to_string(),
528                });
529            }
530        };
531        let key = StoreStateKey::from(key);
532        match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
533            Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
534            Ok(None) => Err(StateError {
535                code: "not_found".into(),
536                message: "state key not found".into(),
537            }),
538            Err(err) => Err(StateError {
539                code: "internal".into(),
540                message: err.to_string(),
541            }),
542        }
543    }
544
545    fn write(
546        &mut self,
547        key: HostStateKey,
548        bytes: Vec<u8>,
549        ctx: Option<StateTenantCtx>,
550    ) -> Result<StateOpAck, StateError> {
551        let store = match self.state_store.as_ref() {
552            Some(store) => store.clone(),
553            None => {
554                return Err(StateError {
555                    code: "unavailable".into(),
556                    message: "state store not configured".into(),
557                });
558            }
559        };
560        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
561            Ok(ctx) => ctx,
562            Err(err) => {
563                return Err(StateError {
564                    code: "invalid-ctx".into(),
565                    message: err.to_string(),
566                });
567            }
568        };
569        let key = StoreStateKey::from(key);
570        let value = serde_json::from_slice(&bytes)
571            .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
572        match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
573            Ok(()) => Ok(StateOpAck::Ok),
574            Err(err) => Err(StateError {
575                code: "internal".into(),
576                message: err.to_string(),
577            }),
578        }
579    }
580
581    fn delete(
582        &mut self,
583        key: HostStateKey,
584        ctx: Option<StateTenantCtx>,
585    ) -> Result<StateOpAck, StateError> {
586        let store = match self.state_store.as_ref() {
587            Some(store) => store.clone(),
588            None => {
589                return Err(StateError {
590                    code: "unavailable".into(),
591                    message: "state store not configured".into(),
592                });
593            }
594        };
595        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
596            Ok(ctx) => ctx,
597            Err(err) => {
598                return Err(StateError {
599                    code: "invalid-ctx".into(),
600                    message: err.to_string(),
601                });
602            }
603        };
604        let key = StoreStateKey::from(key);
605        match store.del(&tenant_ctx, STATE_PREFIX, &key) {
606            Ok(_) => Ok(StateOpAck::Ok),
607            Err(err) => Err(StateError {
608                code: "internal".into(),
609                message: err.to_string(),
610            }),
611        }
612    }
613}
614
615impl TelemetryLoggerHost for HostState {
616    fn log(
617        &mut self,
618        span: TelemetrySpanContext,
619        fields: Vec<(String, String)>,
620        _ctx: Option<TelemetryTenantCtx>,
621    ) -> Result<TelemetryAck, TelemetryError> {
622        if let Some(mock) = &self.mocks
623            && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
624        {
625            return Ok(TelemetryAck::Ok);
626        }
627        let mut map = serde_json::Map::new();
628        for (k, v) in fields {
629            map.insert(k, Value::String(v));
630        }
631        tracing::info!(
632            tenant = %span.tenant,
633            flow_id = %span.flow_id,
634            node = ?span.node_id,
635            provider = %span.provider,
636            fields = %serde_json::Value::Object(map.clone()),
637            "telemetry log from pack"
638        );
639        Ok(TelemetryAck::Ok)
640    }
641}
642
643impl RunnerHostHttp for HostState {
644    fn request(
645        &mut self,
646        method: String,
647        url: String,
648        headers: Vec<String>,
649        body: Option<Vec<u8>>,
650    ) -> Result<Vec<u8>, String> {
651        let req = HttpRequest {
652            method,
653            url,
654            headers: headers
655                .chunks(2)
656                .filter_map(|chunk| {
657                    if chunk.len() == 2 {
658                        Some((chunk[0].clone(), chunk[1].clone()))
659                    } else {
660                        None
661                    }
662                })
663                .collect(),
664            body,
665        };
666        match HttpClientHost::send(self, req, None) {
667            Ok(resp) => Ok(resp.body.unwrap_or_default()),
668            Err(err) => Err(err.message),
669        }
670    }
671}
672
673impl RunnerHostKv for HostState {
674    fn get(&mut self, _ns: String, _key: String) -> Option<String> {
675        None
676    }
677
678    fn put(&mut self, _ns: String, _key: String, _val: String) {}
679}
680
681enum ManifestLoad {
682    New {
683        manifest: Box<greentic_types::PackManifest>,
684        flows: PackFlows,
685    },
686    Legacy {
687        manifest: Box<legacy_pack::PackManifest>,
688        flows: PackFlows,
689    },
690}
691
692fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
693    let mut archive = ZipArchive::new(File::open(path)?)
694        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
695    let bytes = read_entry(&mut archive, "manifest.cbor")
696        .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
697    match decode_pack_manifest(&bytes) {
698        Ok(manifest) => {
699            let cache = PackFlows::from_manifest(manifest.clone());
700            Ok(ManifestLoad::New {
701                manifest: Box::new(manifest),
702                flows: cache,
703            })
704        }
705        Err(err) => {
706            tracing::debug!(error = %err, pack = %path.display(), "decode_pack_manifest failed; trying legacy manifest");
707            // Fall back to legacy pack manifest
708            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
709                .context("failed to decode legacy pack manifest from manifest.cbor")?;
710            let flows = load_legacy_flows(&mut archive, &legacy)?;
711            Ok(ManifestLoad::Legacy {
712                manifest: Box::new(legacy),
713                flows,
714            })
715        }
716    }
717}
718
719fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
720    let manifest_path = root.join("manifest.cbor");
721    let bytes = std::fs::read(&manifest_path)
722        .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
723    match decode_pack_manifest(&bytes) {
724        Ok(manifest) => {
725            let cache = PackFlows::from_manifest(manifest.clone());
726            Ok(ManifestLoad::New {
727                manifest: Box::new(manifest),
728                flows: cache,
729            })
730        }
731        Err(err) => {
732            tracing::debug!(
733                error = %err,
734                pack = %root.display(),
735                "decode_pack_manifest failed for materialized pack; trying legacy manifest"
736            );
737            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
738                .context("failed to decode legacy pack manifest from manifest.cbor")?;
739            let flows = load_legacy_flows_from_dir(root, &legacy)?;
740            Ok(ManifestLoad::Legacy {
741                manifest: Box::new(legacy),
742                flows,
743            })
744        }
745    }
746}
747
748fn load_legacy_flows(
749    archive: &mut ZipArchive<File>,
750    manifest: &legacy_pack::PackManifest,
751) -> Result<PackFlows> {
752    let mut flows = HashMap::new();
753    let mut descriptors = Vec::new();
754
755    for entry in &manifest.flows {
756        let bytes = read_entry(archive, &entry.file_json)
757            .with_context(|| format!("missing flow json {}", entry.file_json))?;
758        let doc: FlowDoc = serde_json::from_slice(&bytes)
759            .with_context(|| format!("failed to decode flow doc {}", entry.file_json))?;
760        let normalized = normalize_flow_doc(doc);
761        let flow_ir = flow_doc_to_ir(normalized)?;
762        let flow = flow_ir_to_flow(flow_ir)?;
763
764        descriptors.push(FlowDescriptor {
765            id: entry.id.clone(),
766            flow_type: entry.kind.clone(),
767            profile: manifest.meta.pack_id.clone(),
768            version: manifest.meta.version.to_string(),
769            description: None,
770        });
771        flows.insert(entry.id.clone(), flow);
772    }
773
774    let mut entry_flows = manifest.meta.entry_flows.clone();
775    if entry_flows.is_empty() {
776        entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
777    }
778    let metadata = PackMetadata {
779        pack_id: manifest.meta.pack_id.clone(),
780        version: manifest.meta.version.to_string(),
781        entry_flows,
782        secret_requirements: Vec::new(),
783    };
784
785    Ok(PackFlows {
786        descriptors,
787        flows,
788        metadata,
789    })
790}
791
792fn load_legacy_flows_from_dir(
793    root: &Path,
794    manifest: &legacy_pack::PackManifest,
795) -> Result<PackFlows> {
796    let mut flows = HashMap::new();
797    let mut descriptors = Vec::new();
798
799    for entry in &manifest.flows {
800        let path = root.join(&entry.file_json);
801        let bytes = std::fs::read(&path)
802            .with_context(|| format!("missing flow json {}", path.display()))?;
803        let doc: FlowDoc = serde_json::from_slice(&bytes)
804            .with_context(|| format!("failed to decode flow doc {}", path.display()))?;
805        let normalized = normalize_flow_doc(doc);
806        let flow_ir = flow_doc_to_ir(normalized)?;
807        let flow = flow_ir_to_flow(flow_ir)?;
808
809        descriptors.push(FlowDescriptor {
810            id: entry.id.clone(),
811            flow_type: entry.kind.clone(),
812            profile: manifest.meta.pack_id.clone(),
813            version: manifest.meta.version.to_string(),
814            description: None,
815        });
816        flows.insert(entry.id.clone(), flow);
817    }
818
819    let mut entry_flows = manifest.meta.entry_flows.clone();
820    if entry_flows.is_empty() {
821        entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
822    }
823    let metadata = PackMetadata {
824        pack_id: manifest.meta.pack_id.clone(),
825        version: manifest.meta.version.to_string(),
826        entry_flows,
827        secret_requirements: Vec::new(),
828    };
829
830    Ok(PackFlows {
831        descriptors,
832        flows,
833        metadata,
834    })
835}
836
837pub struct ComponentState {
838    pub host: HostState,
839    wasi_ctx: WasiCtx,
840    resource_table: ResourceTable,
841}
842
843impl ComponentState {
844    pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
845        let wasi_ctx = policy
846            .instantiate()
847            .context("failed to build WASI context")?;
848        Ok(Self {
849            host,
850            wasi_ctx,
851            resource_table: ResourceTable::new(),
852        })
853    }
854
855    fn host_mut(&mut self) -> &mut HostState {
856        &mut self.host
857    }
858
859    fn should_cancel_host(&mut self) -> bool {
860        false
861    }
862
863    fn yield_now_host(&mut self) {
864        // no-op cooperative yield
865    }
866}
867
868impl component_api::v0_4::greentic::component::control::Host for ComponentState {
869    fn should_cancel(&mut self) -> bool {
870        self.should_cancel_host()
871    }
872
873    fn yield_now(&mut self) {
874        self.yield_now_host();
875    }
876}
877
878impl component_api::v0_5::greentic::component::control::Host for ComponentState {
879    fn should_cancel(&mut self) -> bool {
880        self.should_cancel_host()
881    }
882
883    fn yield_now(&mut self) {
884        self.yield_now_host();
885    }
886}
887
888fn add_component_control_instance(
889    linker: &mut Linker<ComponentState>,
890    name: &str,
891) -> wasmtime::Result<()> {
892    let mut inst = linker.instance(name)?;
893    inst.func_wrap(
894        "should-cancel",
895        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
896            let host = caller.data_mut();
897            Ok((host.should_cancel_host(),))
898        },
899    )?;
900    inst.func_wrap(
901        "yield-now",
902        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
903            let host = caller.data_mut();
904            host.yield_now_host();
905            Ok(())
906        },
907    )?;
908    Ok(())
909}
910
911fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
912    add_component_control_instance(linker, "greentic:component/control@0.5.0")?;
913    add_component_control_instance(linker, "greentic:component/control@0.4.0")?;
914    Ok(())
915}
916
917pub fn register_all(linker: &mut Linker<ComponentState>, allow_state_store: bool) -> Result<()> {
918    add_wasi_to_linker(linker)?;
919    add_all_v1_to_linker(
920        linker,
921        HostFns {
922            http_client_v1_1: Some(|state| state.host_mut()),
923            http_client: Some(|state| state.host_mut()),
924            oauth_broker: None,
925            runner_host_http: Some(|state| state.host_mut()),
926            runner_host_kv: Some(|state| state.host_mut()),
927            telemetry_logger: Some(|state| state.host_mut()),
928            state_store: allow_state_store.then_some(|state| state.host_mut()),
929            secrets_store: Some(|state| state.host_mut()),
930        },
931    )?;
932    Ok(())
933}
934
935impl OAuthHostContext for ComponentState {
936    fn tenant_id(&self) -> &str {
937        &self.host.config.tenant
938    }
939
940    fn env(&self) -> &str {
941        &self.host.default_env
942    }
943
944    fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
945        &mut self.host.oauth_host
946    }
947
948    fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
949        self.host.oauth_config.as_ref()
950    }
951}
952
953impl WasiView for ComponentState {
954    fn ctx(&mut self) -> WasiCtxView<'_> {
955        WasiCtxView {
956            ctx: &mut self.wasi_ctx,
957            table: &mut self.resource_table,
958        }
959    }
960}
961
962#[allow(unsafe_code)]
963unsafe impl Send for ComponentState {}
964#[allow(unsafe_code)]
965unsafe impl Sync for ComponentState {}
966
967impl PackRuntime {
968    fn allows_state_store(&self, component_ref: &str) -> bool {
969        if self.state_store.is_none() {
970            return false;
971        }
972        if !self.config.state_store_policy.allow {
973            return false;
974        }
975        let Some(manifest) = self.component_manifests.get(component_ref) else {
976            return false;
977        };
978        manifest
979            .capabilities
980            .host
981            .state
982            .as_ref()
983            .map(|caps| caps.read || caps.write)
984            .unwrap_or(false)
985    }
986
987    #[allow(clippy::too_many_arguments)]
988    pub async fn load(
989        path: impl AsRef<Path>,
990        config: Arc<HostConfig>,
991        mocks: Option<Arc<MockLayer>>,
992        archive_source: Option<&Path>,
993        session_store: Option<DynSessionStore>,
994        state_store: Option<DynStateStore>,
995        wasi_policy: Arc<RunnerWasiPolicy>,
996        secrets: DynSecretsManager,
997        oauth_config: Option<OAuthBrokerConfig>,
998        verify_archive: bool,
999        component_resolution: ComponentResolution,
1000    ) -> Result<Self> {
1001        let path = path.as_ref();
1002        let (_pack_root, safe_path) = normalize_pack_path(path)?;
1003        let path_meta = std::fs::metadata(&safe_path).ok();
1004        let is_dir = path_meta
1005            .as_ref()
1006            .map(|meta| meta.is_dir())
1007            .unwrap_or(false);
1008        let is_component = !is_dir
1009            && safe_path
1010                .extension()
1011                .and_then(|ext| ext.to_str())
1012                .map(|ext| ext.eq_ignore_ascii_case("wasm"))
1013                .unwrap_or(false);
1014        let archive_hint_path = if let Some(source) = archive_source {
1015            let (_, normalized) = normalize_pack_path(source)?;
1016            Some(normalized)
1017        } else if is_component || is_dir {
1018            None
1019        } else {
1020            Some(safe_path.clone())
1021        };
1022        let archive_hint = archive_hint_path.as_deref();
1023        if verify_archive {
1024            if let Some(verify_target) = archive_hint.and_then(|p| {
1025                std::fs::metadata(p)
1026                    .ok()
1027                    .filter(|meta| meta.is_file())
1028                    .map(|_| p)
1029            }) {
1030                verify::verify_pack(verify_target).await?;
1031                tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
1032            } else {
1033                tracing::debug!("skipping archive verification (no archive source)");
1034            }
1035        }
1036        let engine = Engine::default();
1037        let mut metadata = PackMetadata::fallback(&safe_path);
1038        let mut manifest = None;
1039        let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
1040        let mut flows = None;
1041        let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
1042            if is_dir {
1043                Some(safe_path.clone())
1044            } else {
1045                None
1046            }
1047        });
1048
1049        if let Some(root) = materialized_root.as_ref() {
1050            match load_manifest_and_flows_from_dir(root) {
1051                Ok(ManifestLoad::New {
1052                    manifest: m,
1053                    flows: cache,
1054                }) => {
1055                    metadata = cache.metadata.clone();
1056                    manifest = Some(*m);
1057                    flows = Some(cache);
1058                }
1059                Ok(ManifestLoad::Legacy {
1060                    manifest: m,
1061                    flows: cache,
1062                }) => {
1063                    metadata = cache.metadata.clone();
1064                    legacy_manifest = Some(m);
1065                    flows = Some(cache);
1066                }
1067                Err(err) => {
1068                    warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
1069                }
1070            }
1071        }
1072
1073        if manifest.is_none()
1074            && legacy_manifest.is_none()
1075            && let Some(archive_path) = archive_hint
1076        {
1077            match load_manifest_and_flows(archive_path) {
1078                Ok(ManifestLoad::New {
1079                    manifest: m,
1080                    flows: cache,
1081                }) => {
1082                    metadata = cache.metadata.clone();
1083                    manifest = Some(*m);
1084                    flows = Some(cache);
1085                }
1086                Ok(ManifestLoad::Legacy {
1087                    manifest: m,
1088                    flows: cache,
1089                }) => {
1090                    metadata = cache.metadata.clone();
1091                    legacy_manifest = Some(m);
1092                    flows = Some(cache);
1093                }
1094                Err(err) => {
1095                    warn!(error = %err, pack = %archive_path.display(), "failed to parse pack manifest; skipping flows");
1096                }
1097            }
1098        }
1099        let component_sources = if let Some(manifest) = manifest.as_ref() {
1100            component_sources_table(manifest).context("invalid component sources extension")?
1101        } else {
1102            None
1103        };
1104        let components = if is_component {
1105            let wasm_bytes = fs::read(&safe_path).await?;
1106            metadata = PackMetadata::from_wasm(&wasm_bytes)
1107                .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1108            let name = safe_path
1109                .file_stem()
1110                .map(|s| s.to_string_lossy().to_string())
1111                .unwrap_or_else(|| "component".to_string());
1112            let component = Component::from_binary(&engine, &wasm_bytes)?;
1113            let mut map = HashMap::new();
1114            map.insert(
1115                name.clone(),
1116                PackComponent {
1117                    name,
1118                    version: metadata.version.clone(),
1119                    component,
1120                },
1121            );
1122            map
1123        } else {
1124            let specs = component_specs(manifest.as_ref(), legacy_manifest.as_deref());
1125            if specs.is_empty() {
1126                HashMap::new()
1127            } else {
1128                let mut loaded = HashMap::new();
1129                let mut missing: HashSet<String> =
1130                    specs.iter().map(|spec| spec.id.clone()).collect();
1131                let mut searched = Vec::new();
1132
1133                if !component_resolution.overrides.is_empty() {
1134                    load_components_from_overrides(
1135                        &engine,
1136                        &component_resolution.overrides,
1137                        &specs,
1138                        &mut missing,
1139                        &mut loaded,
1140                    )?;
1141                    searched.push("override map".to_string());
1142                }
1143
1144                if let Some(component_sources) = component_sources.as_ref() {
1145                    load_components_from_sources(
1146                        &engine,
1147                        component_sources,
1148                        &component_resolution,
1149                        &specs,
1150                        &mut missing,
1151                        &mut loaded,
1152                        materialized_root.as_deref(),
1153                        archive_hint,
1154                    )
1155                    .await?;
1156                    searched.push(format!("extension {}", EXT_COMPONENT_SOURCES_V1));
1157                }
1158
1159                if let Some(root) = materialized_root.as_ref() {
1160                    load_components_from_dir(&engine, root, &specs, &mut missing, &mut loaded)?;
1161                    searched.push(format!("components dir {}", root.display()));
1162                }
1163
1164                if let Some(archive_path) = archive_hint {
1165                    load_components_from_archive(
1166                        &engine,
1167                        archive_path,
1168                        &specs,
1169                        &mut missing,
1170                        &mut loaded,
1171                    )?;
1172                    searched.push(format!("archive {}", archive_path.display()));
1173                }
1174
1175                if !missing.is_empty() {
1176                    let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
1177                    let sources = if searched.is_empty() {
1178                        "no component sources".to_string()
1179                    } else {
1180                        searched.join(", ")
1181                    };
1182                    bail!(
1183                        "components missing: {}; looked in {}",
1184                        missing_list,
1185                        sources
1186                    );
1187                }
1188
1189                loaded
1190            }
1191        };
1192        let http_client = Arc::clone(&HTTP_CLIENT);
1193        let mut component_manifests = HashMap::new();
1194        if let Some(manifest) = manifest.as_ref() {
1195            for component in &manifest.components {
1196                component_manifests.insert(component.id.as_str().to_string(), component.clone());
1197            }
1198        }
1199        Ok(Self {
1200            path: safe_path,
1201            archive_path: archive_hint.map(Path::to_path_buf),
1202            config,
1203            engine,
1204            metadata,
1205            manifest,
1206            legacy_manifest,
1207            component_manifests,
1208            mocks,
1209            flows,
1210            components,
1211            http_client,
1212            pre_cache: Mutex::new(HashMap::new()),
1213            session_store,
1214            state_store,
1215            wasi_policy,
1216            provider_registry: RwLock::new(None),
1217            secrets,
1218            oauth_config,
1219        })
1220    }
1221
1222    pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
1223        if let Some(cache) = &self.flows {
1224            return Ok(cache.descriptors.clone());
1225        }
1226        if let Some(manifest) = &self.manifest {
1227            let descriptors = manifest
1228                .flows
1229                .iter()
1230                .map(|flow| FlowDescriptor {
1231                    id: flow.id.as_str().to_string(),
1232                    flow_type: flow_kind_to_str(flow.kind).to_string(),
1233                    profile: manifest.pack_id.as_str().to_string(),
1234                    version: manifest.version.to_string(),
1235                    description: None,
1236                })
1237                .collect();
1238            return Ok(descriptors);
1239        }
1240        Ok(Vec::new())
1241    }
1242
1243    #[allow(dead_code)]
1244    pub async fn run_flow(
1245        &self,
1246        flow_id: &str,
1247        input: serde_json::Value,
1248    ) -> Result<serde_json::Value> {
1249        let pack = Arc::new(
1250            PackRuntime::load(
1251                &self.path,
1252                Arc::clone(&self.config),
1253                self.mocks.clone(),
1254                self.archive_path.as_deref(),
1255                self.session_store.clone(),
1256                self.state_store.clone(),
1257                Arc::clone(&self.wasi_policy),
1258                self.secrets.clone(),
1259                self.oauth_config.clone(),
1260                false,
1261                ComponentResolution::default(),
1262            )
1263            .await?,
1264        );
1265
1266        let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1267        let retry_config = self.config.retry_config().into();
1268        let mocks = pack.mocks.as_deref();
1269        let tenant = self.config.tenant.as_str();
1270
1271        let ctx = FlowContext {
1272            tenant,
1273            flow_id,
1274            node_id: None,
1275            tool: None,
1276            action: None,
1277            session_id: None,
1278            provider_id: None,
1279            retry_config,
1280            observer: None,
1281            mocks,
1282        };
1283
1284        let execution = engine.execute(ctx, input).await?;
1285        match execution.status {
1286            FlowStatus::Completed => Ok(execution.output),
1287            FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1288                "status": "pending",
1289                "reason": wait.reason,
1290                "resume": wait.snapshot,
1291                "response": execution.output,
1292            })),
1293        }
1294    }
1295
1296    pub async fn invoke_component(
1297        &self,
1298        component_ref: &str,
1299        ctx: ComponentExecCtx,
1300        operation: &str,
1301        _config_json: Option<String>,
1302        input_json: String,
1303    ) -> Result<Value> {
1304        let pack_component = self
1305            .components
1306            .get(component_ref)
1307            .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1308
1309        let mut linker = Linker::new(&self.engine);
1310        let allow_state_store = self.allows_state_store(component_ref);
1311        register_all(&mut linker, allow_state_store)?;
1312        add_component_control_to_linker(&mut linker)?;
1313
1314        let host_state = HostState::new(
1315            Arc::clone(&self.config),
1316            Arc::clone(&self.http_client),
1317            self.mocks.clone(),
1318            self.session_store.clone(),
1319            self.state_store.clone(),
1320            Arc::clone(&self.secrets),
1321            self.oauth_config.clone(),
1322            Some(ctx.clone()),
1323        )?;
1324        let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1325        let mut store = wasmtime::Store::new(&self.engine, store_state);
1326        let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1327        let result = match component_api::v0_5::ComponentPre::new(pre_instance) {
1328            Ok(pre) => {
1329                let bindings = pre.instantiate_async(&mut store).await?;
1330                let node = bindings.greentic_component_node();
1331                let ctx_v05 = component_api::exec_ctx_v0_5(&ctx);
1332                let result = node.call_invoke(&mut store, &ctx_v05, operation, &input_json)?;
1333                component_api::invoke_result_from_v0_5(result)
1334            }
1335            Err(err) => {
1336                if is_missing_node_export(&err, "0.5.0") {
1337                    let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1338                    let pre: component_api::v0_4::ComponentPre<ComponentState> =
1339                        component_api::v0_4::ComponentPre::new(pre_instance)?;
1340                    let bindings = pre.instantiate_async(&mut store).await?;
1341                    let node = bindings.greentic_component_node();
1342                    let ctx_v04 = component_api::exec_ctx_v0_4(&ctx);
1343                    let result = node.call_invoke(&mut store, &ctx_v04, operation, &input_json)?;
1344                    component_api::invoke_result_from_v0_4(result)
1345                } else {
1346                    return Err(err);
1347                }
1348            }
1349        };
1350
1351        match result {
1352            InvokeResult::Ok(body) => {
1353                if body.is_empty() {
1354                    return Ok(Value::Null);
1355                }
1356                serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
1357            }
1358            InvokeResult::Err(NodeError {
1359                code,
1360                message,
1361                retryable,
1362                backoff_ms,
1363                details,
1364            }) => {
1365                let mut obj = serde_json::Map::new();
1366                obj.insert("ok".into(), Value::Bool(false));
1367                let mut error = serde_json::Map::new();
1368                error.insert("code".into(), Value::String(code));
1369                error.insert("message".into(), Value::String(message));
1370                error.insert("retryable".into(), Value::Bool(retryable));
1371                if let Some(backoff) = backoff_ms {
1372                    error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1373                }
1374                if let Some(details) = details {
1375                    error.insert(
1376                        "details".into(),
1377                        serde_json::from_str(&details).unwrap_or(Value::String(details)),
1378                    );
1379                }
1380                obj.insert("error".into(), Value::Object(error));
1381                Ok(Value::Object(obj))
1382            }
1383        }
1384    }
1385
1386    pub fn resolve_provider(
1387        &self,
1388        provider_id: Option<&str>,
1389        provider_type: Option<&str>,
1390    ) -> Result<ProviderBinding> {
1391        let registry = self.provider_registry()?;
1392        registry.resolve(provider_id, provider_type)
1393    }
1394
1395    pub async fn invoke_provider(
1396        &self,
1397        binding: &ProviderBinding,
1398        ctx: ComponentExecCtx,
1399        op: &str,
1400        input_json: Vec<u8>,
1401    ) -> Result<Value> {
1402        let component_ref = &binding.component_ref;
1403        let pack_component = self
1404            .components
1405            .get(component_ref)
1406            .with_context(|| format!("provider component '{component_ref}' not found in pack"))?;
1407
1408        let mut linker = Linker::new(&self.engine);
1409        let allow_state_store = self.allows_state_store(component_ref);
1410        register_all(&mut linker, allow_state_store)?;
1411        add_component_control_to_linker(&mut linker)?;
1412        let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1413        let pre: ProviderComponentPre<ComponentState> = ProviderComponentPre::new(pre_instance)?;
1414
1415        let host_state = HostState::new(
1416            Arc::clone(&self.config),
1417            Arc::clone(&self.http_client),
1418            self.mocks.clone(),
1419            self.session_store.clone(),
1420            self.state_store.clone(),
1421            Arc::clone(&self.secrets),
1422            self.oauth_config.clone(),
1423            Some(ctx),
1424        )?;
1425        let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1426        let mut store = wasmtime::Store::new(&self.engine, store_state);
1427        let bindings: crate::provider_core::SchemaCore = pre.instantiate_async(&mut store).await?;
1428        let provider = bindings.greentic_provider_core_schema_core_api();
1429
1430        let result = provider.call_invoke(&mut store, op, &input_json)?;
1431        deserialize_json_bytes(result)
1432    }
1433
1434    fn provider_registry(&self) -> Result<ProviderRegistry> {
1435        if let Some(registry) = self.provider_registry.read().clone() {
1436            return Ok(registry);
1437        }
1438        let manifest = self
1439            .manifest
1440            .as_ref()
1441            .context("pack manifest required for provider resolution")?;
1442        let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1443        let registry = ProviderRegistry::new(
1444            manifest,
1445            self.state_store.clone(),
1446            &self.config.tenant,
1447            &env,
1448        )?;
1449        *self.provider_registry.write() = Some(registry.clone());
1450        Ok(registry)
1451    }
1452
1453    pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1454        if let Some(cache) = &self.flows {
1455            return cache
1456                .flows
1457                .get(flow_id)
1458                .cloned()
1459                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1460        }
1461        if let Some(manifest) = &self.manifest {
1462            let entry = manifest
1463                .flows
1464                .iter()
1465                .find(|f| f.id.as_str() == flow_id)
1466                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1467            return Ok(entry.flow.clone());
1468        }
1469        bail!("flow '{flow_id}' not available (pack exports disabled)")
1470    }
1471
1472    pub fn metadata(&self) -> &PackMetadata {
1473        &self.metadata
1474    }
1475
1476    pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1477        &self.metadata.secret_requirements
1478    }
1479
1480    pub fn missing_secrets(
1481        &self,
1482        tenant_ctx: &TypesTenantCtx,
1483    ) -> Vec<greentic_types::SecretRequirement> {
1484        let env = tenant_ctx.env.as_str().to_string();
1485        let tenant = tenant_ctx.tenant.as_str().to_string();
1486        let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1487        self.required_secrets()
1488            .iter()
1489            .filter(|req| {
1490                // scope must match current context if provided
1491                if let Some(scope) = &req.scope {
1492                    if scope.env != env {
1493                        return false;
1494                    }
1495                    if scope.tenant != tenant {
1496                        return false;
1497                    }
1498                    if let Some(ref team_req) = scope.team
1499                        && team.as_ref() != Some(team_req)
1500                    {
1501                        return false;
1502                    }
1503                }
1504                read_secret_blocking(&self.secrets, req.key.as_str()).is_err()
1505            })
1506            .cloned()
1507            .collect()
1508    }
1509
1510    pub fn for_component_test(
1511        components: Vec<(String, PathBuf)>,
1512        flows: HashMap<String, FlowIR>,
1513        config: Arc<HostConfig>,
1514    ) -> Result<Self> {
1515        let engine = Engine::default();
1516        let mut component_map = HashMap::new();
1517        for (name, path) in components {
1518            if !path.exists() {
1519                bail!("component artifact missing: {}", path.display());
1520            }
1521            let wasm_bytes = std::fs::read(&path)?;
1522            let component = Component::from_binary(&engine, &wasm_bytes)
1523                .with_context(|| format!("failed to compile component {}", path.display()))?;
1524            component_map.insert(
1525                name.clone(),
1526                PackComponent {
1527                    name,
1528                    version: "0.0.0".into(),
1529                    component,
1530                },
1531            );
1532        }
1533
1534        let mut flow_map = HashMap::new();
1535        let mut descriptors = Vec::new();
1536        for (id, ir) in flows {
1537            let flow_type = ir.flow_type.clone();
1538            let flow = flow_ir_to_flow(ir)?;
1539            flow_map.insert(id.clone(), flow);
1540            descriptors.push(FlowDescriptor {
1541                id: id.clone(),
1542                flow_type,
1543                profile: "test".into(),
1544                version: "0.0.0".into(),
1545                description: None,
1546            });
1547        }
1548        let flows_cache = PackFlows {
1549            descriptors: descriptors.clone(),
1550            flows: flow_map,
1551            metadata: PackMetadata::fallback(Path::new("component-test")),
1552        };
1553
1554        Ok(Self {
1555            path: PathBuf::new(),
1556            archive_path: None,
1557            config,
1558            engine,
1559            metadata: PackMetadata::fallback(Path::new("component-test")),
1560            manifest: None,
1561            legacy_manifest: None,
1562            component_manifests: HashMap::new(),
1563            mocks: None,
1564            flows: Some(flows_cache),
1565            components: component_map,
1566            http_client: Arc::clone(&HTTP_CLIENT),
1567            pre_cache: Mutex::new(HashMap::new()),
1568            session_store: None,
1569            state_store: None,
1570            wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1571            provider_registry: RwLock::new(None),
1572            secrets: crate::secrets::default_manager(),
1573            oauth_config: None,
1574        })
1575    }
1576}
1577
1578fn is_missing_node_export(err: &wasmtime::Error, version: &str) -> bool {
1579    let message = err.to_string();
1580    message.contains("no exported instance named")
1581        && message.contains(&format!("greentic:component/node@{version}"))
1582}
1583
1584struct PackFlows {
1585    descriptors: Vec<FlowDescriptor>,
1586    flows: HashMap<String, Flow>,
1587    metadata: PackMetadata,
1588}
1589
1590const RUNTIME_FLOW_EXTENSION_IDS: [&str; 3] = [
1591    "greentic.pack.runtime_flow",
1592    "greentic.pack.flow_runtime",
1593    "greentic.pack.runtime_flows",
1594];
1595
1596#[derive(Debug, Deserialize)]
1597struct RuntimeFlowBundle {
1598    flows: Vec<RuntimeFlow>,
1599}
1600
1601#[derive(Debug, Deserialize)]
1602struct RuntimeFlow {
1603    id: String,
1604    #[serde(alias = "flow_type")]
1605    kind: FlowKind,
1606    #[serde(default)]
1607    schema_version: Option<String>,
1608    #[serde(default)]
1609    start: Option<String>,
1610    #[serde(default)]
1611    entrypoints: BTreeMap<String, Value>,
1612    nodes: BTreeMap<String, RuntimeNode>,
1613    #[serde(default)]
1614    metadata: Option<FlowMetadata>,
1615}
1616
1617#[derive(Debug, Deserialize)]
1618struct RuntimeNode {
1619    #[serde(alias = "component")]
1620    component_id: String,
1621    #[serde(default, alias = "operation")]
1622    operation_name: Option<String>,
1623    #[serde(default, alias = "payload", alias = "input")]
1624    operation_payload: Value,
1625    #[serde(default)]
1626    routing: Option<Routing>,
1627    #[serde(default)]
1628    telemetry: Option<TelemetryHints>,
1629}
1630
1631fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
1632    if bytes.is_empty() {
1633        return Ok(Value::Null);
1634    }
1635    serde_json::from_slice(&bytes).or_else(|_| {
1636        String::from_utf8(bytes)
1637            .map(Value::String)
1638            .map_err(|err| anyhow!(err))
1639    })
1640}
1641
1642impl PackFlows {
1643    fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
1644        if let Some(flows) = flows_from_runtime_extension(&manifest) {
1645            return flows;
1646        }
1647        let descriptors = manifest
1648            .flows
1649            .iter()
1650            .map(|entry| FlowDescriptor {
1651                id: entry.id.as_str().to_string(),
1652                flow_type: flow_kind_to_str(entry.kind).to_string(),
1653                profile: manifest.pack_id.as_str().to_string(),
1654                version: manifest.version.to_string(),
1655                description: None,
1656            })
1657            .collect();
1658        let mut flows = HashMap::new();
1659        for entry in &manifest.flows {
1660            flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
1661        }
1662        Self {
1663            metadata: PackMetadata::from_manifest(&manifest),
1664            descriptors,
1665            flows,
1666        }
1667    }
1668}
1669
1670fn flows_from_runtime_extension(manifest: &greentic_types::PackManifest) -> Option<PackFlows> {
1671    let extensions = manifest.extensions.as_ref()?;
1672    let extension = extensions.iter().find_map(|(key, ext)| {
1673        if RUNTIME_FLOW_EXTENSION_IDS
1674            .iter()
1675            .any(|candidate| candidate == key)
1676        {
1677            Some(ext)
1678        } else {
1679            None
1680        }
1681    })?;
1682    let runtime_flows = match decode_runtime_flow_extension(extension) {
1683        Some(flows) if !flows.is_empty() => flows,
1684        _ => return None,
1685    };
1686
1687    let descriptors = runtime_flows
1688        .iter()
1689        .map(|flow| FlowDescriptor {
1690            id: flow.id.as_str().to_string(),
1691            flow_type: flow_kind_to_str(flow.kind).to_string(),
1692            profile: manifest.pack_id.as_str().to_string(),
1693            version: manifest.version.to_string(),
1694            description: None,
1695        })
1696        .collect::<Vec<_>>();
1697    let flows = runtime_flows
1698        .into_iter()
1699        .map(|flow| (flow.id.as_str().to_string(), flow))
1700        .collect();
1701
1702    Some(PackFlows {
1703        metadata: PackMetadata::from_manifest(manifest),
1704        descriptors,
1705        flows,
1706    })
1707}
1708
1709fn decode_runtime_flow_extension(extension: &ExtensionRef) -> Option<Vec<Flow>> {
1710    let value = match extension.inline.as_ref()? {
1711        ExtensionInline::Other(value) => value.clone(),
1712        _ => return None,
1713    };
1714
1715    if let Ok(bundle) = serde_json::from_value::<RuntimeFlowBundle>(value.clone()) {
1716        return Some(collect_runtime_flows(bundle.flows));
1717    }
1718
1719    if let Ok(flows) = serde_json::from_value::<Vec<RuntimeFlow>>(value.clone()) {
1720        return Some(collect_runtime_flows(flows));
1721    }
1722
1723    if let Ok(flows) = serde_json::from_value::<Vec<Flow>>(value) {
1724        return Some(flows);
1725    }
1726
1727    warn!(
1728        extension = %extension.kind,
1729        version = %extension.version,
1730        "runtime flow extension present but could not be decoded"
1731    );
1732    None
1733}
1734
1735fn collect_runtime_flows(flows: Vec<RuntimeFlow>) -> Vec<Flow> {
1736    flows
1737        .into_iter()
1738        .filter_map(|flow| match runtime_flow_to_flow(flow) {
1739            Ok(flow) => Some(flow),
1740            Err(err) => {
1741                warn!(error = %err, "failed to decode runtime flow");
1742                None
1743            }
1744        })
1745        .collect()
1746}
1747
1748fn runtime_flow_to_flow(runtime: RuntimeFlow) -> Result<Flow> {
1749    let flow_id = FlowId::from_str(&runtime.id)
1750        .with_context(|| format!("invalid flow id `{}`", runtime.id))?;
1751    let mut entrypoints = runtime.entrypoints;
1752    if entrypoints.is_empty()
1753        && let Some(start) = &runtime.start
1754    {
1755        entrypoints.insert("default".into(), Value::String(start.clone()));
1756    }
1757
1758    let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
1759    for (id, node) in runtime.nodes {
1760        let node_id = NodeId::from_str(&id).with_context(|| format!("invalid node id `{id}`"))?;
1761        let component_id = ComponentId::from_str(&node.component_id)
1762            .with_context(|| format!("invalid component id `{}`", node.component_id))?;
1763        let component = FlowComponentRef {
1764            id: component_id,
1765            pack_alias: None,
1766            operation: node.operation_name,
1767        };
1768        let routing = node.routing.unwrap_or(Routing::End);
1769        let telemetry = node.telemetry.unwrap_or_default();
1770        nodes.insert(
1771            node_id.clone(),
1772            Node {
1773                id: node_id,
1774                component,
1775                input: InputMapping {
1776                    mapping: node.operation_payload,
1777                },
1778                output: OutputMapping {
1779                    mapping: Value::Null,
1780                },
1781                routing,
1782                telemetry,
1783            },
1784        );
1785    }
1786
1787    Ok(Flow {
1788        schema_version: runtime.schema_version.unwrap_or_else(|| "1.0".to_string()),
1789        id: flow_id,
1790        kind: runtime.kind,
1791        entrypoints,
1792        nodes,
1793        metadata: runtime.metadata.unwrap_or_default(),
1794    })
1795}
1796
1797fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
1798    match kind {
1799        greentic_types::FlowKind::Messaging => "messaging",
1800        greentic_types::FlowKind::Event => "event",
1801        greentic_types::FlowKind::ComponentConfig => "component-config",
1802        greentic_types::FlowKind::Job => "job",
1803        greentic_types::FlowKind::Http => "http",
1804    }
1805}
1806
1807fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1808    let mut file = archive
1809        .by_name(name)
1810        .with_context(|| format!("entry {name} missing from archive"))?;
1811    let mut buf = Vec::new();
1812    file.read_to_end(&mut buf)?;
1813    Ok(buf)
1814}
1815
1816fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
1817    for node in doc.nodes.values_mut() {
1818        let Some((component_ref, payload)) = node
1819            .raw
1820            .iter()
1821            .next()
1822            .map(|(key, value)| (key.clone(), value.clone()))
1823        else {
1824            continue;
1825        };
1826        if component_ref.starts_with("emit.") {
1827            node.operation = Some(component_ref);
1828            node.payload = payload;
1829            node.raw.clear();
1830            continue;
1831        }
1832        let (target_component, operation, input, config) =
1833            infer_component_exec(&payload, &component_ref);
1834        let mut payload_obj = serde_json::Map::new();
1835        // component.exec is meta; ensure the payload carries the actual target component.
1836        payload_obj.insert("component".into(), Value::String(target_component));
1837        payload_obj.insert("operation".into(), Value::String(operation));
1838        payload_obj.insert("input".into(), input);
1839        if let Some(cfg) = config {
1840            payload_obj.insert("config".into(), cfg);
1841        }
1842        node.operation = Some("component.exec".to_string());
1843        node.payload = Value::Object(payload_obj);
1844        node.raw.clear();
1845    }
1846    doc
1847}
1848
1849fn infer_component_exec(
1850    payload: &Value,
1851    component_ref: &str,
1852) -> (String, String, Value, Option<Value>) {
1853    let default_op = if component_ref.starts_with("templating.") {
1854        "render"
1855    } else {
1856        "invoke"
1857    }
1858    .to_string();
1859
1860    if let Value::Object(map) = payload {
1861        let op = map
1862            .get("op")
1863            .or_else(|| map.get("operation"))
1864            .and_then(Value::as_str)
1865            .map(|s| s.to_string())
1866            .unwrap_or_else(|| default_op.clone());
1867
1868        let mut input = map.clone();
1869        let config = input.remove("config");
1870        let component = input
1871            .get("component")
1872            .or_else(|| input.get("component_ref"))
1873            .and_then(Value::as_str)
1874            .map(|s| s.to_string())
1875            .unwrap_or_else(|| component_ref.to_string());
1876        input.remove("component");
1877        input.remove("component_ref");
1878        input.remove("op");
1879        input.remove("operation");
1880        return (component, op, Value::Object(input), config);
1881    }
1882
1883    (component_ref.to_string(), default_op, payload.clone(), None)
1884}
1885
1886#[derive(Clone, Debug)]
1887struct ComponentSpec {
1888    id: String,
1889    version: String,
1890    legacy_path: Option<String>,
1891}
1892
1893#[derive(Clone, Debug)]
1894struct ComponentSourceInfo {
1895    digest: String,
1896    source: ComponentSourceRef,
1897    artifact: ComponentArtifactLocation,
1898}
1899
1900#[derive(Clone, Debug)]
1901enum ComponentArtifactLocation {
1902    Inline { wasm_path: String },
1903    Remote,
1904}
1905
1906fn component_specs(
1907    manifest: Option<&greentic_types::PackManifest>,
1908    legacy_manifest: Option<&legacy_pack::PackManifest>,
1909) -> Vec<ComponentSpec> {
1910    if let Some(manifest) = manifest {
1911        return manifest
1912            .components
1913            .iter()
1914            .map(|entry| ComponentSpec {
1915                id: entry.id.as_str().to_string(),
1916                version: entry.version.to_string(),
1917                legacy_path: None,
1918            })
1919            .collect();
1920    }
1921    if let Some(legacy_manifest) = legacy_manifest {
1922        return legacy_manifest
1923            .components
1924            .iter()
1925            .map(|entry| ComponentSpec {
1926                id: entry.name.clone(),
1927                version: entry.version.to_string(),
1928                legacy_path: Some(entry.file_wasm.clone()),
1929            })
1930            .collect();
1931    }
1932    Vec::new()
1933}
1934
1935fn component_sources_table(
1936    manifest: &greentic_types::PackManifest,
1937) -> Result<Option<HashMap<String, ComponentSourceInfo>>> {
1938    let Some(sources) = manifest.get_component_sources_v1()? else {
1939        return Ok(None);
1940    };
1941    let mut table = HashMap::new();
1942    for entry in sources.components {
1943        let artifact = match entry.artifact {
1944            ArtifactLocationV1::Inline { wasm_path, .. } => {
1945                ComponentArtifactLocation::Inline { wasm_path }
1946            }
1947            ArtifactLocationV1::Remote => ComponentArtifactLocation::Remote,
1948        };
1949        let info = ComponentSourceInfo {
1950            digest: entry.resolved.digest,
1951            source: entry.source,
1952            artifact,
1953        };
1954        if let Some(component_id) = entry.component_id.as_ref() {
1955            table.insert(component_id.as_str().to_string(), info.clone());
1956        }
1957        table.insert(entry.name, info);
1958    }
1959    Ok(Some(table))
1960}
1961
1962fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
1963    if let Some(path) = &spec.legacy_path {
1964        return root.join(path);
1965    }
1966    root.join("components").join(format!("{}.wasm", spec.id))
1967}
1968
1969fn normalize_digest(digest: &str) -> String {
1970    if digest.starts_with("sha256:") || digest.starts_with("blake3:") {
1971        digest.to_string()
1972    } else {
1973        format!("sha256:{digest}")
1974    }
1975}
1976
1977fn compute_digest_for(bytes: &[u8], digest: &str) -> Result<String> {
1978    if digest.starts_with("blake3:") {
1979        let hash = blake3::hash(bytes);
1980        return Ok(format!("blake3:{}", hash.to_hex()));
1981    }
1982    let mut hasher = sha2::Sha256::new();
1983    hasher.update(bytes);
1984    Ok(format!("sha256:{:x}", hasher.finalize()))
1985}
1986
1987fn verify_component_digest(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
1988    let normalized_expected = normalize_digest(expected);
1989    let actual = compute_digest_for(bytes, &normalized_expected)?;
1990    if normalize_digest(&actual) != normalized_expected {
1991        bail!(
1992            "component {component_id} digest mismatch: expected {normalized_expected}, got {actual}"
1993        );
1994    }
1995    Ok(())
1996}
1997
1998fn dist_options_from(component_resolution: &ComponentResolution) -> DistOptions {
1999    let mut opts = DistOptions {
2000        allow_tags: true,
2001        ..DistOptions::default()
2002    };
2003    if let Some(cache_dir) = component_resolution.dist_cache_dir.clone() {
2004        opts.cache_dir = cache_dir;
2005    }
2006    if component_resolution.dist_offline {
2007        opts.offline = true;
2008    }
2009    opts
2010}
2011
2012#[allow(clippy::too_many_arguments)]
2013async fn load_components_from_sources(
2014    engine: &Engine,
2015    component_sources: &HashMap<String, ComponentSourceInfo>,
2016    component_resolution: &ComponentResolution,
2017    specs: &[ComponentSpec],
2018    missing: &mut HashSet<String>,
2019    into: &mut HashMap<String, PackComponent>,
2020    materialized_root: Option<&Path>,
2021    archive_hint: Option<&Path>,
2022) -> Result<()> {
2023    let mut archive = if let Some(path) = archive_hint {
2024        Some(
2025            ZipArchive::new(File::open(path)?)
2026                .with_context(|| format!("{} is not a valid gtpack", path.display()))?,
2027        )
2028    } else {
2029        None
2030    };
2031    let mut dist_client: Option<DistClient> = None;
2032
2033    for spec in specs {
2034        if !missing.contains(&spec.id) {
2035            continue;
2036        }
2037        let Some(source) = component_sources.get(&spec.id) else {
2038            continue;
2039        };
2040
2041        let bytes = match &source.artifact {
2042            ComponentArtifactLocation::Inline { wasm_path } => {
2043                if let Some(root) = materialized_root {
2044                    let path = root.join(wasm_path);
2045                    if path.exists() {
2046                        std::fs::read(&path).with_context(|| {
2047                            format!(
2048                                "failed to read inline component {} from {}",
2049                                spec.id,
2050                                path.display()
2051                            )
2052                        })?
2053                    } else if archive.is_none() {
2054                        bail!("inline component {} missing at {}", spec.id, path.display());
2055                    } else {
2056                        read_entry(
2057                            archive.as_mut().expect("archive present when needed"),
2058                            wasm_path,
2059                        )
2060                        .with_context(|| {
2061                            format!(
2062                                "inline component {} missing at {} in pack archive",
2063                                spec.id, wasm_path
2064                            )
2065                        })?
2066                    }
2067                } else if let Some(archive) = archive.as_mut() {
2068                    read_entry(archive, wasm_path).with_context(|| {
2069                        format!(
2070                            "inline component {} missing at {} in pack archive",
2071                            spec.id, wasm_path
2072                        )
2073                    })?
2074                } else {
2075                    bail!(
2076                        "inline component {} missing and no pack source available",
2077                        spec.id
2078                    );
2079                }
2080            }
2081            ComponentArtifactLocation::Remote => {
2082                let client = dist_client.get_or_insert_with(|| {
2083                    DistClient::new(dist_options_from(component_resolution))
2084                });
2085                let reference = source.source.to_string();
2086                let cache_path = if component_resolution.dist_offline {
2087                    client
2088                        .fetch_digest(&source.digest)
2089                        .await
2090                        .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?
2091                } else {
2092                    let resolved = client
2093                        .resolve_ref(&reference)
2094                        .await
2095                        .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
2096                    let expected = normalize_digest(&source.digest);
2097                    let actual = normalize_digest(&resolved.digest);
2098                    if expected != actual {
2099                        bail!(
2100                            "component {} digest mismatch after fetch: expected {}, got {}",
2101                            spec.id,
2102                            expected,
2103                            actual
2104                        );
2105                    }
2106                    resolved.cache_path.ok_or_else(|| {
2107                        anyhow!(
2108                            "component {} resolved from {} but cache path is missing",
2109                            spec.id,
2110                            reference
2111                        )
2112                    })?
2113                };
2114                std::fs::read(&cache_path).with_context(|| {
2115                    format!(
2116                        "failed to read cached component {} from {}",
2117                        spec.id,
2118                        cache_path.display()
2119                    )
2120                })?
2121            }
2122        };
2123
2124        verify_component_digest(&spec.id, &source.digest, &bytes)?;
2125        let component = Component::from_binary(engine, &bytes)
2126            .with_context(|| format!("failed to compile component {}", spec.id))?;
2127        into.insert(
2128            spec.id.clone(),
2129            PackComponent {
2130                name: spec.id.clone(),
2131                version: spec.version.clone(),
2132                component,
2133            },
2134        );
2135        missing.remove(&spec.id);
2136    }
2137
2138    Ok(())
2139}
2140
2141fn dist_error_for_component(err: DistError, component_id: &str, reference: &str) -> anyhow::Error {
2142    match err {
2143        DistError::CacheMiss { reference: missing } => anyhow!(
2144            "remote component {} is not cached for {}. Run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2145            component_id,
2146            missing,
2147            reference
2148        ),
2149        DistError::Offline { reference: blocked } => anyhow!(
2150            "offline mode blocked fetching component {} from {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2151            component_id,
2152            blocked,
2153            reference
2154        ),
2155        DistError::AuthRequired { target } => anyhow!(
2156            "component {} requires authenticated source {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2157            component_id,
2158            target,
2159            reference
2160        ),
2161        other => anyhow!(
2162            "failed to resolve component {} from {}: {}",
2163            component_id,
2164            reference,
2165            other
2166        ),
2167    }
2168}
2169
2170fn load_components_from_overrides(
2171    engine: &Engine,
2172    overrides: &HashMap<String, PathBuf>,
2173    specs: &[ComponentSpec],
2174    missing: &mut HashSet<String>,
2175    into: &mut HashMap<String, PackComponent>,
2176) -> Result<()> {
2177    for spec in specs {
2178        if !missing.contains(&spec.id) {
2179            continue;
2180        }
2181        let Some(path) = overrides.get(&spec.id) else {
2182            continue;
2183        };
2184        let bytes = std::fs::read(path)
2185            .with_context(|| format!("failed to read override component {}", path.display()))?;
2186        let component = Component::from_binary(engine, &bytes).with_context(|| {
2187            format!(
2188                "failed to compile component {} from override {}",
2189                spec.id,
2190                path.display()
2191            )
2192        })?;
2193        into.insert(
2194            spec.id.clone(),
2195            PackComponent {
2196                name: spec.id.clone(),
2197                version: spec.version.clone(),
2198                component,
2199            },
2200        );
2201        missing.remove(&spec.id);
2202    }
2203    Ok(())
2204}
2205
2206fn load_components_from_dir(
2207    engine: &Engine,
2208    root: &Path,
2209    specs: &[ComponentSpec],
2210    missing: &mut HashSet<String>,
2211    into: &mut HashMap<String, PackComponent>,
2212) -> Result<()> {
2213    for spec in specs {
2214        if !missing.contains(&spec.id) {
2215            continue;
2216        }
2217        let path = component_path_for_spec(root, spec);
2218        if !path.exists() {
2219            tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
2220            continue;
2221        }
2222        let bytes = std::fs::read(&path)
2223            .with_context(|| format!("failed to read component {}", path.display()))?;
2224        let component = Component::from_binary(engine, &bytes).with_context(|| {
2225            format!(
2226                "failed to compile component {} from {}",
2227                spec.id,
2228                path.display()
2229            )
2230        })?;
2231        into.insert(
2232            spec.id.clone(),
2233            PackComponent {
2234                name: spec.id.clone(),
2235                version: spec.version.clone(),
2236                component,
2237            },
2238        );
2239        missing.remove(&spec.id);
2240    }
2241    Ok(())
2242}
2243
2244fn load_components_from_archive(
2245    engine: &Engine,
2246    path: &Path,
2247    specs: &[ComponentSpec],
2248    missing: &mut HashSet<String>,
2249    into: &mut HashMap<String, PackComponent>,
2250) -> Result<()> {
2251    let mut archive = ZipArchive::new(File::open(path)?)
2252        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
2253    for spec in specs {
2254        if !missing.contains(&spec.id) {
2255            continue;
2256        }
2257        let file_name = spec
2258            .legacy_path
2259            .clone()
2260            .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
2261        let bytes = match read_entry(&mut archive, &file_name) {
2262            Ok(bytes) => bytes,
2263            Err(err) => {
2264                warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
2265                continue;
2266            }
2267        };
2268        let component = Component::from_binary(engine, &bytes)
2269            .with_context(|| format!("failed to compile component {}", spec.id))?;
2270        into.insert(
2271            spec.id.clone(),
2272            PackComponent {
2273                name: spec.id.clone(),
2274                version: spec.version.clone(),
2275                component,
2276            },
2277        );
2278        missing.remove(&spec.id);
2279    }
2280    Ok(())
2281}
2282
2283#[cfg(test)]
2284mod tests {
2285    use super::*;
2286    use greentic_flow::model::{FlowDoc, NodeDoc};
2287    use indexmap::IndexMap;
2288    use serde_json::json;
2289
2290    #[test]
2291    fn normalizes_raw_component_to_component_exec() {
2292        let mut nodes = IndexMap::new();
2293        let mut raw = IndexMap::new();
2294        raw.insert(
2295            "templating.handlebars".into(),
2296            json!({ "template": "Hi {{name}}" }),
2297        );
2298        nodes.insert(
2299            "start".into(),
2300            NodeDoc {
2301                raw,
2302                routing: json!([{"out": true}]),
2303                ..Default::default()
2304            },
2305        );
2306        let doc = FlowDoc {
2307            id: "welcome".into(),
2308            title: None,
2309            description: None,
2310            flow_type: "messaging".into(),
2311            start: Some("start".into()),
2312            parameters: json!({}),
2313            tags: Vec::new(),
2314            schema_version: None,
2315            entrypoints: IndexMap::new(),
2316            nodes,
2317        };
2318
2319        let normalized = normalize_flow_doc(doc);
2320        let node = normalized.nodes.get("start").expect("node exists");
2321        assert_eq!(node.operation.as_deref(), Some("component.exec"));
2322        assert!(node.raw.is_empty());
2323        let payload = node.payload.as_object().expect("payload object");
2324        assert_eq!(
2325            payload.get("component"),
2326            Some(&Value::String("templating.handlebars".into()))
2327        );
2328        assert_eq!(
2329            payload.get("operation"),
2330            Some(&Value::String("render".into()))
2331        );
2332        let input = payload.get("input").unwrap();
2333        assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
2334    }
2335}
2336
2337#[derive(Clone, Debug, Default, Serialize, Deserialize)]
2338pub struct PackMetadata {
2339    pub pack_id: String,
2340    pub version: String,
2341    #[serde(default)]
2342    pub entry_flows: Vec<String>,
2343    #[serde(default)]
2344    pub secret_requirements: Vec<greentic_types::SecretRequirement>,
2345}
2346
2347impl PackMetadata {
2348    fn from_wasm(bytes: &[u8]) -> Option<Self> {
2349        let parser = Parser::new(0);
2350        for payload in parser.parse_all(bytes) {
2351            let payload = payload.ok()?;
2352            match payload {
2353                Payload::CustomSection(section) => {
2354                    if section.name() == "greentic.manifest"
2355                        && let Ok(meta) = Self::from_bytes(section.data())
2356                    {
2357                        return Some(meta);
2358                    }
2359                }
2360                Payload::DataSection(reader) => {
2361                    for segment in reader.into_iter().flatten() {
2362                        if let Ok(meta) = Self::from_bytes(segment.data) {
2363                            return Some(meta);
2364                        }
2365                    }
2366                }
2367                _ => {}
2368            }
2369        }
2370        None
2371    }
2372
2373    fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
2374        #[derive(Deserialize)]
2375        struct RawManifest {
2376            pack_id: String,
2377            version: String,
2378            #[serde(default)]
2379            entry_flows: Vec<String>,
2380            #[serde(default)]
2381            flows: Vec<RawFlow>,
2382            #[serde(default)]
2383            secret_requirements: Vec<greentic_types::SecretRequirement>,
2384        }
2385
2386        #[derive(Deserialize)]
2387        struct RawFlow {
2388            id: String,
2389        }
2390
2391        let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
2392        let mut entry_flows = if manifest.entry_flows.is_empty() {
2393            manifest.flows.iter().map(|f| f.id.clone()).collect()
2394        } else {
2395            manifest.entry_flows.clone()
2396        };
2397        entry_flows.retain(|id| !id.is_empty());
2398        Ok(Self {
2399            pack_id: manifest.pack_id,
2400            version: manifest.version,
2401            entry_flows,
2402            secret_requirements: manifest.secret_requirements,
2403        })
2404    }
2405
2406    pub fn fallback(path: &Path) -> Self {
2407        let pack_id = path
2408            .file_stem()
2409            .map(|s| s.to_string_lossy().into_owned())
2410            .unwrap_or_else(|| "unknown-pack".to_string());
2411        Self {
2412            pack_id,
2413            version: "0.0.0".to_string(),
2414            entry_flows: Vec::new(),
2415            secret_requirements: Vec::new(),
2416        }
2417    }
2418
2419    pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
2420        let entry_flows = manifest
2421            .flows
2422            .iter()
2423            .map(|flow| flow.id.as_str().to_string())
2424            .collect::<Vec<_>>();
2425        Self {
2426            pack_id: manifest.pack_id.as_str().to_string(),
2427            version: manifest.version.to_string(),
2428            entry_flows,
2429            secret_requirements: manifest.secret_requirements.clone(),
2430        }
2431    }
2432}