Skip to main content

greentic_runner_host/
pack.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::component_api::{
10    self, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
11};
12use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
13use crate::provider::{ProviderBinding, ProviderRegistry};
14use crate::provider_core::SchemaCorePre as ProviderComponentPre;
15use crate::provider_core_only;
16use crate::runtime_wasmtime::{Component, Engine, InstancePre, Linker, ResourceTable};
17use anyhow::{Context, Result, anyhow, bail};
18use greentic_distributor_client::dist::{DistClient, DistError, DistOptions};
19use greentic_interfaces_wasmtime::host_helpers::v1::{
20    self as host_v1, HostFns, add_all_v1_to_linker,
21    runner_host_http::RunnerHostHttp,
22    runner_host_kv::RunnerHostKv,
23    secrets_store::{SecretsError, SecretsStoreHost},
24    state_store::{
25        OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
26        StateStoreHost, TenantCtx as StateTenantCtx,
27    },
28    telemetry_logger::{
29        OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
30        TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
31        TenantCtx as TelemetryTenantCtx,
32    },
33};
34use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
35use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
36use greentic_pack::builder as legacy_pack;
37use greentic_types::flow::FlowHasher;
38use greentic_types::{
39    ArtifactLocationV1, ComponentId, ComponentManifest, ComponentSourceRef, ComponentSourcesV1,
40    EXT_COMPONENT_SOURCES_V1, EnvId, ExtensionRef, Flow, FlowComponentRef, FlowId, FlowKind,
41    FlowMetadata, InputMapping, Node, NodeId, OutputMapping, Routing, StateKey as StoreStateKey,
42    TeamId, TelemetryHints, TenantCtx as TypesTenantCtx, TenantId, UserId, decode_pack_manifest,
43    pack_manifest::ExtensionInline,
44};
45use host_v1::http_client::{
46    HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
47    Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
48    RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
49    TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
50};
51use indexmap::IndexMap;
52use once_cell::sync::Lazy;
53use parking_lot::{Mutex, RwLock};
54use reqwest::blocking::Client as BlockingClient;
55use runner_core::normalize_under_root;
56use serde::{Deserialize, Serialize};
57use serde_cbor;
58use serde_json::{self, Value};
59use sha2::Digest;
60use tokio::fs;
61use wasmparser::{Parser, Payload};
62use wasmtime::StoreContextMut;
63use zip::ZipArchive;
64
65use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
66use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
67use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
68
69use crate::config::HostConfig;
70use crate::secrets::{DynSecretsManager, read_secret_blocking};
71use crate::storage::state::STATE_PREFIX;
72use crate::storage::{DynSessionStore, DynStateStore};
73use crate::verify;
74use crate::wasi::RunnerWasiPolicy;
75use tracing::warn;
76use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
77use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
78
79use greentic_flow::model::FlowDoc;
80
81#[allow(dead_code)]
82pub struct PackRuntime {
83    /// Component artifact path (wasm file).
84    path: PathBuf,
85    /// Optional archive (.gtpack) used to load flows/manifests.
86    archive_path: Option<PathBuf>,
87    config: Arc<HostConfig>,
88    engine: Engine,
89    metadata: PackMetadata,
90    manifest: Option<greentic_types::PackManifest>,
91    legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
92    component_manifests: HashMap<String, ComponentManifest>,
93    mocks: Option<Arc<MockLayer>>,
94    flows: Option<PackFlows>,
95    components: HashMap<String, PackComponent>,
96    http_client: Arc<BlockingClient>,
97    pre_cache: Mutex<HashMap<String, InstancePre<ComponentState>>>,
98    session_store: Option<DynSessionStore>,
99    state_store: Option<DynStateStore>,
100    wasi_policy: Arc<RunnerWasiPolicy>,
101    provider_registry: RwLock<Option<ProviderRegistry>>,
102    secrets: DynSecretsManager,
103    oauth_config: Option<OAuthBrokerConfig>,
104}
105
106struct PackComponent {
107    #[allow(dead_code)]
108    name: String,
109    #[allow(dead_code)]
110    version: String,
111    component: Component,
112}
113
114#[derive(Debug, Default, Clone)]
115pub struct ComponentResolution {
116    /// Root of a materialized pack directory containing `manifest.cbor` and `components/`.
117    pub materialized_root: Option<PathBuf>,
118    /// Explicit overrides mapping component id -> wasm path.
119    pub overrides: HashMap<String, PathBuf>,
120    /// If true, do not fetch remote components; require cached artifacts.
121    pub dist_offline: bool,
122    /// Optional cache directory for resolved remote components.
123    pub dist_cache_dir: Option<PathBuf>,
124}
125
126fn build_blocking_client() -> BlockingClient {
127    std::thread::spawn(|| {
128        BlockingClient::builder()
129            .no_proxy()
130            .build()
131            .expect("blocking client")
132    })
133    .join()
134    .expect("client build thread panicked")
135}
136
137fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
138    let (root, candidate) = if path.is_absolute() {
139        let parent = path
140            .parent()
141            .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
142        let root = parent
143            .canonicalize()
144            .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
145        let file = path
146            .file_name()
147            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
148        (root, PathBuf::from(file))
149    } else {
150        let cwd = std::env::current_dir().context("failed to resolve current directory")?;
151        let base = if let Some(parent) = path.parent() {
152            cwd.join(parent)
153        } else {
154            cwd
155        };
156        let root = base
157            .canonicalize()
158            .with_context(|| format!("failed to canonicalize {}", base.display()))?;
159        let file = path
160            .file_name()
161            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
162        (root, PathBuf::from(file))
163    };
164    let safe = normalize_under_root(&root, &candidate)?;
165    Ok((root, safe))
166}
167
168static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
169
170#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct FlowDescriptor {
172    pub id: String,
173    #[serde(rename = "type")]
174    pub flow_type: String,
175    pub profile: String,
176    pub version: String,
177    #[serde(default)]
178    pub description: Option<String>,
179}
180
181pub struct HostState {
182    config: Arc<HostConfig>,
183    http_client: Arc<BlockingClient>,
184    default_env: String,
185    #[allow(dead_code)]
186    session_store: Option<DynSessionStore>,
187    state_store: Option<DynStateStore>,
188    mocks: Option<Arc<MockLayer>>,
189    secrets: DynSecretsManager,
190    oauth_config: Option<OAuthBrokerConfig>,
191    oauth_host: OAuthBrokerHost,
192    exec_ctx: Option<ComponentExecCtx>,
193}
194
195impl HostState {
196    #[allow(clippy::default_constructed_unit_structs)]
197    #[allow(clippy::too_many_arguments)]
198    pub fn new(
199        config: Arc<HostConfig>,
200        http_client: Arc<BlockingClient>,
201        mocks: Option<Arc<MockLayer>>,
202        session_store: Option<DynSessionStore>,
203        state_store: Option<DynStateStore>,
204        secrets: DynSecretsManager,
205        oauth_config: Option<OAuthBrokerConfig>,
206        exec_ctx: Option<ComponentExecCtx>,
207    ) -> Result<Self> {
208        let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
209        Ok(Self {
210            config,
211            http_client,
212            default_env,
213            session_store,
214            state_store,
215            mocks,
216            secrets,
217            oauth_config,
218            oauth_host: OAuthBrokerHost::default(),
219            exec_ctx,
220        })
221    }
222
223    pub fn get_secret(&self, key: &str) -> Result<String> {
224        if provider_core_only::is_enabled() {
225            bail!(provider_core_only::blocked_message("secrets"))
226        }
227        if !self.config.secrets_policy.is_allowed(key) {
228            bail!("secret {key} is not permitted by bindings policy");
229        }
230        if let Some(mock) = &self.mocks
231            && let Some(value) = mock.secrets_lookup(key)
232        {
233            return Ok(value);
234        }
235        let bytes = read_secret_blocking(&self.secrets, key)
236            .context("failed to read secret from manager")?;
237        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
238        Ok(value)
239    }
240
241    fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
242        let tenant_raw = ctx
243            .as_ref()
244            .map(|ctx| ctx.tenant.clone())
245            .or_else(|| self.exec_ctx.as_ref().map(|ctx| ctx.tenant.tenant.clone()))
246            .unwrap_or_else(|| self.config.tenant.clone());
247        let env_raw = ctx
248            .as_ref()
249            .map(|ctx| ctx.env.clone())
250            .unwrap_or_else(|| self.default_env.clone());
251        let tenant_id = TenantId::from_str(&tenant_raw)
252            .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
253        let env_id = EnvId::from_str(&env_raw)
254            .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
255        let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
256        if let Some(exec_ctx) = self.exec_ctx.as_ref() {
257            if let Some(team) = exec_ctx.tenant.team.as_ref() {
258                let team_id =
259                    TeamId::from_str(team).with_context(|| format!("invalid team id `{team}`"))?;
260                tenant_ctx = tenant_ctx.with_team(Some(team_id));
261            }
262            if let Some(user) = exec_ctx.tenant.user.as_ref() {
263                let user_id =
264                    UserId::from_str(user).with_context(|| format!("invalid user id `{user}`"))?;
265                tenant_ctx = tenant_ctx.with_user(Some(user_id));
266            }
267            tenant_ctx = tenant_ctx.with_flow(exec_ctx.flow_id.clone());
268            if let Some(node) = exec_ctx.node_id.as_ref() {
269                tenant_ctx = tenant_ctx.with_node(node.clone());
270            }
271            if let Some(session) = exec_ctx.tenant.correlation_id.as_ref() {
272                tenant_ctx = tenant_ctx.with_session(session.clone());
273            }
274            tenant_ctx.trace_id = exec_ctx.tenant.trace_id.clone();
275        }
276
277        if let Some(ctx) = ctx {
278            if let Some(team) = ctx.team.or(ctx.team_id) {
279                let team_id =
280                    TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
281                tenant_ctx = tenant_ctx.with_team(Some(team_id));
282            }
283            if let Some(user) = ctx.user.or(ctx.user_id) {
284                let user_id =
285                    UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
286                tenant_ctx = tenant_ctx.with_user(Some(user_id));
287            }
288            if let Some(flow) = ctx.flow_id {
289                tenant_ctx = tenant_ctx.with_flow(flow);
290            }
291            if let Some(node) = ctx.node_id {
292                tenant_ctx = tenant_ctx.with_node(node);
293            }
294            if let Some(provider) = ctx.provider_id {
295                tenant_ctx = tenant_ctx.with_provider(provider);
296            }
297            if let Some(session) = ctx.session_id {
298                tenant_ctx = tenant_ctx.with_session(session);
299            }
300            tenant_ctx.trace_id = ctx.trace_id;
301        }
302        Ok(tenant_ctx)
303    }
304
305    fn send_http_request(
306        &mut self,
307        req: HttpRequest,
308        opts: Option<HttpRequestOptionsV1_1>,
309        _ctx: Option<HttpTenantCtx>,
310    ) -> Result<HttpResponse, HttpClientError> {
311        if !self.config.http_enabled {
312            return Err(HttpClientError {
313                code: "denied".into(),
314                message: "http client disabled by policy".into(),
315            });
316        }
317
318        let mut mock_state = None;
319        let raw_body = req.body.clone();
320        if let Some(mock) = &self.mocks
321            && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
322        {
323            match mock.http_begin(&meta) {
324                HttpDecision::Mock(response) => {
325                    let headers = response
326                        .headers
327                        .iter()
328                        .map(|(k, v)| (k.clone(), v.clone()))
329                        .collect();
330                    return Ok(HttpResponse {
331                        status: response.status,
332                        headers,
333                        body: response.body.clone().map(|b| b.into_bytes()),
334                    });
335                }
336                HttpDecision::Deny(reason) => {
337                    return Err(HttpClientError {
338                        code: "denied".into(),
339                        message: reason,
340                    });
341                }
342                HttpDecision::Passthrough { record } => {
343                    mock_state = Some((meta, record));
344                }
345            }
346        }
347
348        let method = req.method.parse().unwrap_or(reqwest::Method::GET);
349        let mut builder = self.http_client.request(method, &req.url);
350        for (key, value) in req.headers {
351            if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
352                && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
353            {
354                builder = builder.header(header, header_value);
355            }
356        }
357
358        if let Some(body) = raw_body.clone() {
359            builder = builder.body(body);
360        }
361
362        if let Some(opts) = opts {
363            if let Some(timeout_ms) = opts.timeout_ms {
364                builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
365            }
366            if opts.allow_insecure == Some(true) {
367                warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
368            }
369            if let Some(follow_redirects) = opts.follow_redirects
370                && !follow_redirects
371            {
372                warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
373            }
374        }
375
376        let response = match builder.send() {
377            Ok(resp) => resp,
378            Err(err) => {
379                warn!(url = %req.url, error = %err, "http client request failed");
380                return Err(HttpClientError {
381                    code: "unavailable".into(),
382                    message: err.to_string(),
383                });
384            }
385        };
386
387        let status = response.status().as_u16();
388        let headers_vec = response
389            .headers()
390            .iter()
391            .map(|(k, v)| {
392                (
393                    k.as_str().to_string(),
394                    v.to_str().unwrap_or_default().to_string(),
395                )
396            })
397            .collect::<Vec<_>>();
398        let body_bytes = response.bytes().ok().map(|b| b.to_vec());
399
400        if let Some((meta, true)) = mock_state.take()
401            && let Some(mock) = &self.mocks
402        {
403            let recorded = HttpMockResponse::new(
404                status,
405                headers_vec.clone().into_iter().collect(),
406                body_bytes
407                    .as_ref()
408                    .map(|b| String::from_utf8_lossy(b).into_owned()),
409            );
410            mock.http_record(&meta, &recorded);
411        }
412
413        Ok(HttpResponse {
414            status,
415            headers: headers_vec,
416            body: body_bytes,
417        })
418    }
419}
420
421impl SecretsStoreHost for HostState {
422    fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
423        if provider_core_only::is_enabled() {
424            warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
425            return Err(SecretsError::Denied);
426        }
427        if !self.config.secrets_policy.is_allowed(&key) {
428            return Err(SecretsError::Denied);
429        }
430        if let Some(mock) = &self.mocks
431            && let Some(value) = mock.secrets_lookup(&key)
432        {
433            return Ok(Some(value.into_bytes()));
434        }
435        match read_secret_blocking(&self.secrets, &key) {
436            Ok(bytes) => Ok(Some(bytes)),
437            Err(err) => {
438                warn!(secret = %key, error = %err, "secret lookup failed");
439                Err(SecretsError::NotFound)
440            }
441        }
442    }
443}
444
445impl HttpClientHost for HostState {
446    fn send(
447        &mut self,
448        req: HttpRequest,
449        ctx: Option<HttpTenantCtx>,
450    ) -> Result<HttpResponse, HttpClientError> {
451        self.send_http_request(req, None, ctx)
452    }
453}
454
455impl HttpClientHostV1_1 for HostState {
456    fn send(
457        &mut self,
458        req: HttpRequestV1_1,
459        opts: Option<HttpRequestOptionsV1_1>,
460        ctx: Option<HttpTenantCtxV1_1>,
461    ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
462        let legacy_req = HttpRequest {
463            method: req.method,
464            url: req.url,
465            headers: req.headers,
466            body: req.body,
467        };
468        let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
469            env: ctx.env,
470            tenant: ctx.tenant,
471            tenant_id: ctx.tenant_id,
472            team: ctx.team,
473            team_id: ctx.team_id,
474            user: ctx.user,
475            user_id: ctx.user_id,
476            trace_id: ctx.trace_id,
477            correlation_id: ctx.correlation_id,
478            attributes: ctx.attributes,
479            session_id: ctx.session_id,
480            flow_id: ctx.flow_id,
481            node_id: ctx.node_id,
482            provider_id: ctx.provider_id,
483            deadline_ms: ctx.deadline_ms,
484            attempt: ctx.attempt,
485            idempotency_key: ctx.idempotency_key,
486            impersonation: ctx
487                .impersonation
488                .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
489                    actor_id,
490                    reason,
491                }),
492        });
493
494        self.send_http_request(legacy_req, opts, legacy_ctx)
495            .map(|resp| HttpResponseV1_1 {
496                status: resp.status,
497                headers: resp.headers,
498                body: resp.body,
499            })
500            .map_err(|err| HttpClientErrorV1_1 {
501                code: err.code,
502                message: err.message,
503            })
504    }
505}
506
507impl StateStoreHost for HostState {
508    fn read(
509        &mut self,
510        key: HostStateKey,
511        ctx: Option<StateTenantCtx>,
512    ) -> Result<Vec<u8>, StateError> {
513        let store = match self.state_store.as_ref() {
514            Some(store) => store.clone(),
515            None => {
516                return Err(StateError {
517                    code: "unavailable".into(),
518                    message: "state store not configured".into(),
519                });
520            }
521        };
522        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
523            Ok(ctx) => ctx,
524            Err(err) => {
525                return Err(StateError {
526                    code: "invalid-ctx".into(),
527                    message: err.to_string(),
528                });
529            }
530        };
531        let key = StoreStateKey::from(key);
532        match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
533            Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
534            Ok(None) => Err(StateError {
535                code: "not_found".into(),
536                message: "state key not found".into(),
537            }),
538            Err(err) => Err(StateError {
539                code: "internal".into(),
540                message: err.to_string(),
541            }),
542        }
543    }
544
545    fn write(
546        &mut self,
547        key: HostStateKey,
548        bytes: Vec<u8>,
549        ctx: Option<StateTenantCtx>,
550    ) -> Result<StateOpAck, StateError> {
551        let store = match self.state_store.as_ref() {
552            Some(store) => store.clone(),
553            None => {
554                return Err(StateError {
555                    code: "unavailable".into(),
556                    message: "state store not configured".into(),
557                });
558            }
559        };
560        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
561            Ok(ctx) => ctx,
562            Err(err) => {
563                return Err(StateError {
564                    code: "invalid-ctx".into(),
565                    message: err.to_string(),
566                });
567            }
568        };
569        let key = StoreStateKey::from(key);
570        let value = serde_json::from_slice(&bytes)
571            .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
572        match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
573            Ok(()) => Ok(StateOpAck::Ok),
574            Err(err) => Err(StateError {
575                code: "internal".into(),
576                message: err.to_string(),
577            }),
578        }
579    }
580
581    fn delete(
582        &mut self,
583        key: HostStateKey,
584        ctx: Option<StateTenantCtx>,
585    ) -> Result<StateOpAck, StateError> {
586        let store = match self.state_store.as_ref() {
587            Some(store) => store.clone(),
588            None => {
589                return Err(StateError {
590                    code: "unavailable".into(),
591                    message: "state store not configured".into(),
592                });
593            }
594        };
595        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
596            Ok(ctx) => ctx,
597            Err(err) => {
598                return Err(StateError {
599                    code: "invalid-ctx".into(),
600                    message: err.to_string(),
601                });
602            }
603        };
604        let key = StoreStateKey::from(key);
605        match store.del(&tenant_ctx, STATE_PREFIX, &key) {
606            Ok(_) => Ok(StateOpAck::Ok),
607            Err(err) => Err(StateError {
608                code: "internal".into(),
609                message: err.to_string(),
610            }),
611        }
612    }
613}
614
615impl TelemetryLoggerHost for HostState {
616    fn log(
617        &mut self,
618        span: TelemetrySpanContext,
619        fields: Vec<(String, String)>,
620        _ctx: Option<TelemetryTenantCtx>,
621    ) -> Result<TelemetryAck, TelemetryError> {
622        if let Some(mock) = &self.mocks
623            && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
624        {
625            return Ok(TelemetryAck::Ok);
626        }
627        let mut map = serde_json::Map::new();
628        for (k, v) in fields {
629            map.insert(k, Value::String(v));
630        }
631        tracing::info!(
632            tenant = %span.tenant,
633            flow_id = %span.flow_id,
634            node = ?span.node_id,
635            provider = %span.provider,
636            fields = %serde_json::Value::Object(map.clone()),
637            "telemetry log from pack"
638        );
639        Ok(TelemetryAck::Ok)
640    }
641}
642
643impl RunnerHostHttp for HostState {
644    fn request(
645        &mut self,
646        method: String,
647        url: String,
648        headers: Vec<String>,
649        body: Option<Vec<u8>>,
650    ) -> Result<Vec<u8>, String> {
651        let req = HttpRequest {
652            method,
653            url,
654            headers: headers
655                .chunks(2)
656                .filter_map(|chunk| {
657                    if chunk.len() == 2 {
658                        Some((chunk[0].clone(), chunk[1].clone()))
659                    } else {
660                        None
661                    }
662                })
663                .collect(),
664            body,
665        };
666        match HttpClientHost::send(self, req, None) {
667            Ok(resp) => Ok(resp.body.unwrap_or_default()),
668            Err(err) => Err(err.message),
669        }
670    }
671}
672
673impl RunnerHostKv for HostState {
674    fn get(&mut self, _ns: String, _key: String) -> Option<String> {
675        None
676    }
677
678    fn put(&mut self, _ns: String, _key: String, _val: String) {}
679}
680
681enum ManifestLoad {
682    New {
683        manifest: Box<greentic_types::PackManifest>,
684        flows: PackFlows,
685    },
686    Legacy {
687        manifest: Box<legacy_pack::PackManifest>,
688        flows: PackFlows,
689    },
690}
691
692fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
693    let mut archive = ZipArchive::new(File::open(path)?)
694        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
695    let bytes = read_entry(&mut archive, "manifest.cbor")
696        .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
697    match decode_pack_manifest(&bytes) {
698        Ok(manifest) => {
699            let cache = PackFlows::from_manifest(manifest.clone());
700            Ok(ManifestLoad::New {
701                manifest: Box::new(manifest),
702                flows: cache,
703            })
704        }
705        Err(err) => {
706            tracing::debug!(error = %err, pack = %path.display(), "decode_pack_manifest failed; trying legacy manifest");
707            // Fall back to legacy pack manifest
708            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
709                .context("failed to decode legacy pack manifest from manifest.cbor")?;
710            let flows = load_legacy_flows(&mut archive, &legacy)?;
711            Ok(ManifestLoad::Legacy {
712                manifest: Box::new(legacy),
713                flows,
714            })
715        }
716    }
717}
718
719fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
720    let manifest_path = root.join("manifest.cbor");
721    let bytes = std::fs::read(&manifest_path)
722        .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
723    match decode_pack_manifest(&bytes) {
724        Ok(manifest) => {
725            let cache = PackFlows::from_manifest(manifest.clone());
726            Ok(ManifestLoad::New {
727                manifest: Box::new(manifest),
728                flows: cache,
729            })
730        }
731        Err(err) => {
732            tracing::debug!(
733                error = %err,
734                pack = %root.display(),
735                "decode_pack_manifest failed for materialized pack; trying legacy manifest"
736            );
737            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
738                .context("failed to decode legacy pack manifest from manifest.cbor")?;
739            let flows = load_legacy_flows_from_dir(root, &legacy)?;
740            Ok(ManifestLoad::Legacy {
741                manifest: Box::new(legacy),
742                flows,
743            })
744        }
745    }
746}
747
748fn load_legacy_flows(
749    archive: &mut ZipArchive<File>,
750    manifest: &legacy_pack::PackManifest,
751) -> Result<PackFlows> {
752    let mut flows = HashMap::new();
753    let mut descriptors = Vec::new();
754
755    for entry in &manifest.flows {
756        let bytes = read_entry(archive, &entry.file_json)
757            .with_context(|| format!("missing flow json {}", entry.file_json))?;
758        let doc: FlowDoc = serde_json::from_slice(&bytes)
759            .with_context(|| format!("failed to decode flow doc {}", entry.file_json))?;
760        let normalized = normalize_flow_doc(doc);
761        let flow_ir = flow_doc_to_ir(normalized)?;
762        let flow = flow_ir_to_flow(flow_ir)?;
763
764        descriptors.push(FlowDescriptor {
765            id: entry.id.clone(),
766            flow_type: entry.kind.clone(),
767            profile: manifest.meta.pack_id.clone(),
768            version: manifest.meta.version.to_string(),
769            description: None,
770        });
771        flows.insert(entry.id.clone(), flow);
772    }
773
774    let mut entry_flows = manifest.meta.entry_flows.clone();
775    if entry_flows.is_empty() {
776        entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
777    }
778    let metadata = PackMetadata {
779        pack_id: manifest.meta.pack_id.clone(),
780        version: manifest.meta.version.to_string(),
781        entry_flows,
782        secret_requirements: Vec::new(),
783    };
784
785    Ok(PackFlows {
786        descriptors,
787        flows,
788        metadata,
789    })
790}
791
792fn load_legacy_flows_from_dir(
793    root: &Path,
794    manifest: &legacy_pack::PackManifest,
795) -> Result<PackFlows> {
796    let mut flows = HashMap::new();
797    let mut descriptors = Vec::new();
798
799    for entry in &manifest.flows {
800        let path = root.join(&entry.file_json);
801        let bytes = std::fs::read(&path)
802            .with_context(|| format!("missing flow json {}", path.display()))?;
803        let doc: FlowDoc = serde_json::from_slice(&bytes)
804            .with_context(|| format!("failed to decode flow doc {}", path.display()))?;
805        let normalized = normalize_flow_doc(doc);
806        let flow_ir = flow_doc_to_ir(normalized)?;
807        let flow = flow_ir_to_flow(flow_ir)?;
808
809        descriptors.push(FlowDescriptor {
810            id: entry.id.clone(),
811            flow_type: entry.kind.clone(),
812            profile: manifest.meta.pack_id.clone(),
813            version: manifest.meta.version.to_string(),
814            description: None,
815        });
816        flows.insert(entry.id.clone(), flow);
817    }
818
819    let mut entry_flows = manifest.meta.entry_flows.clone();
820    if entry_flows.is_empty() {
821        entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
822    }
823    let metadata = PackMetadata {
824        pack_id: manifest.meta.pack_id.clone(),
825        version: manifest.meta.version.to_string(),
826        entry_flows,
827        secret_requirements: Vec::new(),
828    };
829
830    Ok(PackFlows {
831        descriptors,
832        flows,
833        metadata,
834    })
835}
836
837pub struct ComponentState {
838    pub host: HostState,
839    wasi_ctx: WasiCtx,
840    resource_table: ResourceTable,
841}
842
843impl ComponentState {
844    pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
845        let wasi_ctx = policy
846            .instantiate()
847            .context("failed to build WASI context")?;
848        Ok(Self {
849            host,
850            wasi_ctx,
851            resource_table: ResourceTable::new(),
852        })
853    }
854
855    fn host_mut(&mut self) -> &mut HostState {
856        &mut self.host
857    }
858
859    fn should_cancel_host(&mut self) -> bool {
860        false
861    }
862
863    fn yield_now_host(&mut self) {
864        // no-op cooperative yield
865    }
866}
867
868impl component_api::v0_4::greentic::component::control::Host for ComponentState {
869    fn should_cancel(&mut self) -> bool {
870        self.should_cancel_host()
871    }
872
873    fn yield_now(&mut self) {
874        self.yield_now_host();
875    }
876}
877
878impl component_api::v0_5::greentic::component::control::Host for ComponentState {
879    fn should_cancel(&mut self) -> bool {
880        self.should_cancel_host()
881    }
882
883    fn yield_now(&mut self) {
884        self.yield_now_host();
885    }
886}
887
888fn add_component_control_instance(
889    linker: &mut Linker<ComponentState>,
890    name: &str,
891) -> wasmtime::Result<()> {
892    let mut inst = linker.instance(name)?;
893    inst.func_wrap(
894        "should-cancel",
895        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
896            let host = caller.data_mut();
897            Ok((host.should_cancel_host(),))
898        },
899    )?;
900    inst.func_wrap(
901        "yield-now",
902        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
903            let host = caller.data_mut();
904            host.yield_now_host();
905            Ok(())
906        },
907    )?;
908    Ok(())
909}
910
911fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
912    add_component_control_instance(linker, "greentic:component/control@0.5.0")?;
913    add_component_control_instance(linker, "greentic:component/control@0.4.0")?;
914    Ok(())
915}
916
917pub fn register_all(linker: &mut Linker<ComponentState>, allow_state_store: bool) -> Result<()> {
918    add_wasi_to_linker(linker)?;
919    add_all_v1_to_linker(
920        linker,
921        HostFns {
922            http_client_v1_1: Some(|state| state.host_mut()),
923            http_client: Some(|state| state.host_mut()),
924            oauth_broker: None,
925            runner_host_http: Some(|state| state.host_mut()),
926            runner_host_kv: Some(|state| state.host_mut()),
927            telemetry_logger: Some(|state| state.host_mut()),
928            state_store: allow_state_store.then_some(|state| state.host_mut()),
929            secrets_store: Some(|state| state.host_mut()),
930        },
931    )?;
932    Ok(())
933}
934
935impl OAuthHostContext for ComponentState {
936    fn tenant_id(&self) -> &str {
937        &self.host.config.tenant
938    }
939
940    fn env(&self) -> &str {
941        &self.host.default_env
942    }
943
944    fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
945        &mut self.host.oauth_host
946    }
947
948    fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
949        self.host.oauth_config.as_ref()
950    }
951}
952
953impl WasiView for ComponentState {
954    fn ctx(&mut self) -> WasiCtxView<'_> {
955        WasiCtxView {
956            ctx: &mut self.wasi_ctx,
957            table: &mut self.resource_table,
958        }
959    }
960}
961
962#[allow(unsafe_code)]
963unsafe impl Send for ComponentState {}
964#[allow(unsafe_code)]
965unsafe impl Sync for ComponentState {}
966
967impl PackRuntime {
968    fn allows_state_store(&self, component_ref: &str) -> bool {
969        if self.state_store.is_none() {
970            return false;
971        }
972        if !self.config.state_store_policy.allow {
973            return false;
974        }
975        let Some(manifest) = self.component_manifests.get(component_ref) else {
976            return false;
977        };
978        manifest
979            .capabilities
980            .host
981            .state
982            .as_ref()
983            .map(|caps| caps.read || caps.write)
984            .unwrap_or(false)
985    }
986
987    #[allow(clippy::too_many_arguments)]
988    pub async fn load(
989        path: impl AsRef<Path>,
990        config: Arc<HostConfig>,
991        mocks: Option<Arc<MockLayer>>,
992        archive_source: Option<&Path>,
993        session_store: Option<DynSessionStore>,
994        state_store: Option<DynStateStore>,
995        wasi_policy: Arc<RunnerWasiPolicy>,
996        secrets: DynSecretsManager,
997        oauth_config: Option<OAuthBrokerConfig>,
998        verify_archive: bool,
999        component_resolution: ComponentResolution,
1000    ) -> Result<Self> {
1001        let path = path.as_ref();
1002        let (_pack_root, safe_path) = normalize_pack_path(path)?;
1003        let path_meta = std::fs::metadata(&safe_path).ok();
1004        let is_dir = path_meta
1005            .as_ref()
1006            .map(|meta| meta.is_dir())
1007            .unwrap_or(false);
1008        let is_component = !is_dir
1009            && safe_path
1010                .extension()
1011                .and_then(|ext| ext.to_str())
1012                .map(|ext| ext.eq_ignore_ascii_case("wasm"))
1013                .unwrap_or(false);
1014        let archive_hint_path = if let Some(source) = archive_source {
1015            let (_, normalized) = normalize_pack_path(source)?;
1016            Some(normalized)
1017        } else if is_component || is_dir {
1018            None
1019        } else {
1020            Some(safe_path.clone())
1021        };
1022        let archive_hint = archive_hint_path.as_deref();
1023        if verify_archive {
1024            if let Some(verify_target) = archive_hint.and_then(|p| {
1025                std::fs::metadata(p)
1026                    .ok()
1027                    .filter(|meta| meta.is_file())
1028                    .map(|_| p)
1029            }) {
1030                verify::verify_pack(verify_target).await?;
1031                tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
1032            } else {
1033                tracing::debug!("skipping archive verification (no archive source)");
1034            }
1035        }
1036        let engine = Engine::default();
1037        let mut metadata = PackMetadata::fallback(&safe_path);
1038        let mut manifest = None;
1039        let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
1040        let mut flows = None;
1041        let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
1042            if is_dir {
1043                Some(safe_path.clone())
1044            } else {
1045                None
1046            }
1047        });
1048
1049        if let Some(root) = materialized_root.as_ref() {
1050            match load_manifest_and_flows_from_dir(root) {
1051                Ok(ManifestLoad::New {
1052                    manifest: m,
1053                    flows: cache,
1054                }) => {
1055                    metadata = cache.metadata.clone();
1056                    manifest = Some(*m);
1057                    flows = Some(cache);
1058                }
1059                Ok(ManifestLoad::Legacy {
1060                    manifest: m,
1061                    flows: cache,
1062                }) => {
1063                    metadata = cache.metadata.clone();
1064                    legacy_manifest = Some(m);
1065                    flows = Some(cache);
1066                }
1067                Err(err) => {
1068                    warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
1069                }
1070            }
1071        }
1072
1073        if manifest.is_none()
1074            && legacy_manifest.is_none()
1075            && let Some(archive_path) = archive_hint
1076        {
1077            match load_manifest_and_flows(archive_path) {
1078                Ok(ManifestLoad::New {
1079                    manifest: m,
1080                    flows: cache,
1081                }) => {
1082                    metadata = cache.metadata.clone();
1083                    manifest = Some(*m);
1084                    flows = Some(cache);
1085                }
1086                Ok(ManifestLoad::Legacy {
1087                    manifest: m,
1088                    flows: cache,
1089                }) => {
1090                    metadata = cache.metadata.clone();
1091                    legacy_manifest = Some(m);
1092                    flows = Some(cache);
1093                }
1094                Err(err) => {
1095                    warn!(error = %err, pack = %archive_path.display(), "failed to parse pack manifest; skipping flows");
1096                }
1097            }
1098        }
1099        let component_sources_payload = if let Some(manifest) = manifest.as_ref() {
1100            manifest
1101                .get_component_sources_v1()
1102                .context("invalid component sources extension")?
1103        } else {
1104            None
1105        };
1106        let component_sources = component_sources_table(component_sources_payload.as_ref())?;
1107        let components = if is_component {
1108            let wasm_bytes = fs::read(&safe_path).await?;
1109            metadata = PackMetadata::from_wasm(&wasm_bytes)
1110                .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1111            let name = safe_path
1112                .file_stem()
1113                .map(|s| s.to_string_lossy().to_string())
1114                .unwrap_or_else(|| "component".to_string());
1115            let component = Component::from_binary(&engine, &wasm_bytes)?;
1116            let mut map = HashMap::new();
1117            map.insert(
1118                name.clone(),
1119                PackComponent {
1120                    name,
1121                    version: metadata.version.clone(),
1122                    component,
1123                },
1124            );
1125            map
1126        } else {
1127            let specs = component_specs(
1128                manifest.as_ref(),
1129                legacy_manifest.as_deref(),
1130                component_sources_payload.as_ref(),
1131            );
1132            if specs.is_empty() {
1133                HashMap::new()
1134            } else {
1135                let mut loaded = HashMap::new();
1136                let mut missing: HashSet<String> =
1137                    specs.iter().map(|spec| spec.id.clone()).collect();
1138                let mut searched = Vec::new();
1139
1140                if !component_resolution.overrides.is_empty() {
1141                    load_components_from_overrides(
1142                        &engine,
1143                        &component_resolution.overrides,
1144                        &specs,
1145                        &mut missing,
1146                        &mut loaded,
1147                    )?;
1148                    searched.push("override map".to_string());
1149                }
1150
1151                if let Some(component_sources) = component_sources.as_ref() {
1152                    load_components_from_sources(
1153                        &engine,
1154                        component_sources,
1155                        &component_resolution,
1156                        &specs,
1157                        &mut missing,
1158                        &mut loaded,
1159                        materialized_root.as_deref(),
1160                        archive_hint,
1161                    )
1162                    .await?;
1163                    searched.push(format!("extension {}", EXT_COMPONENT_SOURCES_V1));
1164                }
1165
1166                if let Some(root) = materialized_root.as_ref() {
1167                    load_components_from_dir(&engine, root, &specs, &mut missing, &mut loaded)?;
1168                    searched.push(format!("components dir {}", root.display()));
1169                }
1170
1171                if let Some(archive_path) = archive_hint {
1172                    load_components_from_archive(
1173                        &engine,
1174                        archive_path,
1175                        &specs,
1176                        &mut missing,
1177                        &mut loaded,
1178                    )?;
1179                    searched.push(format!("archive {}", archive_path.display()));
1180                }
1181
1182                if !missing.is_empty() {
1183                    let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
1184                    let sources = if searched.is_empty() {
1185                        "no component sources".to_string()
1186                    } else {
1187                        searched.join(", ")
1188                    };
1189                    bail!(
1190                        "components missing: {}; looked in {}",
1191                        missing_list,
1192                        sources
1193                    );
1194                }
1195
1196                loaded
1197            }
1198        };
1199        let http_client = Arc::clone(&HTTP_CLIENT);
1200        let mut component_manifests = HashMap::new();
1201        if let Some(manifest) = manifest.as_ref() {
1202            for component in &manifest.components {
1203                component_manifests.insert(component.id.as_str().to_string(), component.clone());
1204            }
1205        }
1206        Ok(Self {
1207            path: safe_path,
1208            archive_path: archive_hint.map(Path::to_path_buf),
1209            config,
1210            engine,
1211            metadata,
1212            manifest,
1213            legacy_manifest,
1214            component_manifests,
1215            mocks,
1216            flows,
1217            components,
1218            http_client,
1219            pre_cache: Mutex::new(HashMap::new()),
1220            session_store,
1221            state_store,
1222            wasi_policy,
1223            provider_registry: RwLock::new(None),
1224            secrets,
1225            oauth_config,
1226        })
1227    }
1228
1229    pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
1230        if let Some(cache) = &self.flows {
1231            return Ok(cache.descriptors.clone());
1232        }
1233        if let Some(manifest) = &self.manifest {
1234            let descriptors = manifest
1235                .flows
1236                .iter()
1237                .map(|flow| FlowDescriptor {
1238                    id: flow.id.as_str().to_string(),
1239                    flow_type: flow_kind_to_str(flow.kind).to_string(),
1240                    profile: manifest.pack_id.as_str().to_string(),
1241                    version: manifest.version.to_string(),
1242                    description: None,
1243                })
1244                .collect();
1245            return Ok(descriptors);
1246        }
1247        Ok(Vec::new())
1248    }
1249
1250    #[allow(dead_code)]
1251    pub async fn run_flow(
1252        &self,
1253        flow_id: &str,
1254        input: serde_json::Value,
1255    ) -> Result<serde_json::Value> {
1256        let pack = Arc::new(
1257            PackRuntime::load(
1258                &self.path,
1259                Arc::clone(&self.config),
1260                self.mocks.clone(),
1261                self.archive_path.as_deref(),
1262                self.session_store.clone(),
1263                self.state_store.clone(),
1264                Arc::clone(&self.wasi_policy),
1265                self.secrets.clone(),
1266                self.oauth_config.clone(),
1267                false,
1268                ComponentResolution::default(),
1269            )
1270            .await?,
1271        );
1272
1273        let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1274        let retry_config = self.config.retry_config().into();
1275        let mocks = pack.mocks.as_deref();
1276        let tenant = self.config.tenant.as_str();
1277
1278        let ctx = FlowContext {
1279            tenant,
1280            flow_id,
1281            node_id: None,
1282            tool: None,
1283            action: None,
1284            session_id: None,
1285            provider_id: None,
1286            retry_config,
1287            observer: None,
1288            mocks,
1289        };
1290
1291        let execution = engine.execute(ctx, input).await?;
1292        match execution.status {
1293            FlowStatus::Completed => Ok(execution.output),
1294            FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1295                "status": "pending",
1296                "reason": wait.reason,
1297                "resume": wait.snapshot,
1298                "response": execution.output,
1299            })),
1300        }
1301    }
1302
1303    pub async fn invoke_component(
1304        &self,
1305        component_ref: &str,
1306        ctx: ComponentExecCtx,
1307        operation: &str,
1308        _config_json: Option<String>,
1309        input_json: String,
1310    ) -> Result<Value> {
1311        let pack_component = self
1312            .components
1313            .get(component_ref)
1314            .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1315
1316        let mut linker = Linker::new(&self.engine);
1317        let allow_state_store = self.allows_state_store(component_ref);
1318        register_all(&mut linker, allow_state_store)?;
1319        add_component_control_to_linker(&mut linker)?;
1320
1321        let host_state = HostState::new(
1322            Arc::clone(&self.config),
1323            Arc::clone(&self.http_client),
1324            self.mocks.clone(),
1325            self.session_store.clone(),
1326            self.state_store.clone(),
1327            Arc::clone(&self.secrets),
1328            self.oauth_config.clone(),
1329            Some(ctx.clone()),
1330        )?;
1331        let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1332        let mut store = wasmtime::Store::new(&self.engine, store_state);
1333        let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1334        let result = match component_api::v0_5::ComponentPre::new(pre_instance) {
1335            Ok(pre) => {
1336                let bindings = pre.instantiate_async(&mut store).await?;
1337                let node = bindings.greentic_component_node();
1338                let ctx_v05 = component_api::exec_ctx_v0_5(&ctx);
1339                let result = node.call_invoke(&mut store, &ctx_v05, operation, &input_json)?;
1340                component_api::invoke_result_from_v0_5(result)
1341            }
1342            Err(err) => {
1343                if is_missing_node_export(&err, "0.5.0") {
1344                    let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1345                    let pre: component_api::v0_4::ComponentPre<ComponentState> =
1346                        component_api::v0_4::ComponentPre::new(pre_instance)?;
1347                    let bindings = pre.instantiate_async(&mut store).await?;
1348                    let node = bindings.greentic_component_node();
1349                    let ctx_v04 = component_api::exec_ctx_v0_4(&ctx);
1350                    let result = node.call_invoke(&mut store, &ctx_v04, operation, &input_json)?;
1351                    component_api::invoke_result_from_v0_4(result)
1352                } else {
1353                    return Err(err);
1354                }
1355            }
1356        };
1357
1358        match result {
1359            InvokeResult::Ok(body) => {
1360                if body.is_empty() {
1361                    return Ok(Value::Null);
1362                }
1363                serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
1364            }
1365            InvokeResult::Err(NodeError {
1366                code,
1367                message,
1368                retryable,
1369                backoff_ms,
1370                details,
1371            }) => {
1372                let mut obj = serde_json::Map::new();
1373                obj.insert("ok".into(), Value::Bool(false));
1374                let mut error = serde_json::Map::new();
1375                error.insert("code".into(), Value::String(code));
1376                error.insert("message".into(), Value::String(message));
1377                error.insert("retryable".into(), Value::Bool(retryable));
1378                if let Some(backoff) = backoff_ms {
1379                    error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1380                }
1381                if let Some(details) = details {
1382                    error.insert(
1383                        "details".into(),
1384                        serde_json::from_str(&details).unwrap_or(Value::String(details)),
1385                    );
1386                }
1387                obj.insert("error".into(), Value::Object(error));
1388                Ok(Value::Object(obj))
1389            }
1390        }
1391    }
1392
1393    pub fn resolve_provider(
1394        &self,
1395        provider_id: Option<&str>,
1396        provider_type: Option<&str>,
1397    ) -> Result<ProviderBinding> {
1398        let registry = self.provider_registry()?;
1399        registry.resolve(provider_id, provider_type)
1400    }
1401
1402    pub async fn invoke_provider(
1403        &self,
1404        binding: &ProviderBinding,
1405        ctx: ComponentExecCtx,
1406        op: &str,
1407        input_json: Vec<u8>,
1408    ) -> Result<Value> {
1409        let component_ref = &binding.component_ref;
1410        let pack_component = self
1411            .components
1412            .get(component_ref)
1413            .with_context(|| format!("provider component '{component_ref}' not found in pack"))?;
1414
1415        let mut linker = Linker::new(&self.engine);
1416        let allow_state_store = self.allows_state_store(component_ref);
1417        register_all(&mut linker, allow_state_store)?;
1418        add_component_control_to_linker(&mut linker)?;
1419        let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1420        let pre: ProviderComponentPre<ComponentState> = ProviderComponentPre::new(pre_instance)?;
1421
1422        let host_state = HostState::new(
1423            Arc::clone(&self.config),
1424            Arc::clone(&self.http_client),
1425            self.mocks.clone(),
1426            self.session_store.clone(),
1427            self.state_store.clone(),
1428            Arc::clone(&self.secrets),
1429            self.oauth_config.clone(),
1430            Some(ctx),
1431        )?;
1432        let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1433        let mut store = wasmtime::Store::new(&self.engine, store_state);
1434        let bindings: crate::provider_core::SchemaCore = pre.instantiate_async(&mut store).await?;
1435        let provider = bindings.greentic_provider_core_schema_core_api();
1436
1437        let result = provider.call_invoke(&mut store, op, &input_json)?;
1438        deserialize_json_bytes(result)
1439    }
1440
1441    fn provider_registry(&self) -> Result<ProviderRegistry> {
1442        if let Some(registry) = self.provider_registry.read().clone() {
1443            return Ok(registry);
1444        }
1445        let manifest = self
1446            .manifest
1447            .as_ref()
1448            .context("pack manifest required for provider resolution")?;
1449        let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1450        let registry = ProviderRegistry::new(
1451            manifest,
1452            self.state_store.clone(),
1453            &self.config.tenant,
1454            &env,
1455        )?;
1456        *self.provider_registry.write() = Some(registry.clone());
1457        Ok(registry)
1458    }
1459
1460    pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1461        if let Some(cache) = &self.flows {
1462            return cache
1463                .flows
1464                .get(flow_id)
1465                .cloned()
1466                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1467        }
1468        if let Some(manifest) = &self.manifest {
1469            let entry = manifest
1470                .flows
1471                .iter()
1472                .find(|f| f.id.as_str() == flow_id)
1473                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1474            return Ok(entry.flow.clone());
1475        }
1476        bail!("flow '{flow_id}' not available (pack exports disabled)")
1477    }
1478
1479    pub fn metadata(&self) -> &PackMetadata {
1480        &self.metadata
1481    }
1482
1483    pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1484        &self.metadata.secret_requirements
1485    }
1486
1487    pub fn missing_secrets(
1488        &self,
1489        tenant_ctx: &TypesTenantCtx,
1490    ) -> Vec<greentic_types::SecretRequirement> {
1491        let env = tenant_ctx.env.as_str().to_string();
1492        let tenant = tenant_ctx.tenant.as_str().to_string();
1493        let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1494        self.required_secrets()
1495            .iter()
1496            .filter(|req| {
1497                // scope must match current context if provided
1498                if let Some(scope) = &req.scope {
1499                    if scope.env != env {
1500                        return false;
1501                    }
1502                    if scope.tenant != tenant {
1503                        return false;
1504                    }
1505                    if let Some(ref team_req) = scope.team
1506                        && team.as_ref() != Some(team_req)
1507                    {
1508                        return false;
1509                    }
1510                }
1511                read_secret_blocking(&self.secrets, req.key.as_str()).is_err()
1512            })
1513            .cloned()
1514            .collect()
1515    }
1516
1517    pub fn for_component_test(
1518        components: Vec<(String, PathBuf)>,
1519        flows: HashMap<String, FlowIR>,
1520        config: Arc<HostConfig>,
1521    ) -> Result<Self> {
1522        let engine = Engine::default();
1523        let mut component_map = HashMap::new();
1524        for (name, path) in components {
1525            if !path.exists() {
1526                bail!("component artifact missing: {}", path.display());
1527            }
1528            let wasm_bytes = std::fs::read(&path)?;
1529            let component = Component::from_binary(&engine, &wasm_bytes)
1530                .with_context(|| format!("failed to compile component {}", path.display()))?;
1531            component_map.insert(
1532                name.clone(),
1533                PackComponent {
1534                    name,
1535                    version: "0.0.0".into(),
1536                    component,
1537                },
1538            );
1539        }
1540
1541        let mut flow_map = HashMap::new();
1542        let mut descriptors = Vec::new();
1543        for (id, ir) in flows {
1544            let flow_type = ir.flow_type.clone();
1545            let flow = flow_ir_to_flow(ir)?;
1546            flow_map.insert(id.clone(), flow);
1547            descriptors.push(FlowDescriptor {
1548                id: id.clone(),
1549                flow_type,
1550                profile: "test".into(),
1551                version: "0.0.0".into(),
1552                description: None,
1553            });
1554        }
1555        let flows_cache = PackFlows {
1556            descriptors: descriptors.clone(),
1557            flows: flow_map,
1558            metadata: PackMetadata::fallback(Path::new("component-test")),
1559        };
1560
1561        Ok(Self {
1562            path: PathBuf::new(),
1563            archive_path: None,
1564            config,
1565            engine,
1566            metadata: PackMetadata::fallback(Path::new("component-test")),
1567            manifest: None,
1568            legacy_manifest: None,
1569            component_manifests: HashMap::new(),
1570            mocks: None,
1571            flows: Some(flows_cache),
1572            components: component_map,
1573            http_client: Arc::clone(&HTTP_CLIENT),
1574            pre_cache: Mutex::new(HashMap::new()),
1575            session_store: None,
1576            state_store: None,
1577            wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1578            provider_registry: RwLock::new(None),
1579            secrets: crate::secrets::default_manager(),
1580            oauth_config: None,
1581        })
1582    }
1583}
1584
1585fn is_missing_node_export(err: &wasmtime::Error, version: &str) -> bool {
1586    let message = err.to_string();
1587    message.contains("no exported instance named")
1588        && message.contains(&format!("greentic:component/node@{version}"))
1589}
1590
1591struct PackFlows {
1592    descriptors: Vec<FlowDescriptor>,
1593    flows: HashMap<String, Flow>,
1594    metadata: PackMetadata,
1595}
1596
1597const RUNTIME_FLOW_EXTENSION_IDS: [&str; 3] = [
1598    "greentic.pack.runtime_flow",
1599    "greentic.pack.flow_runtime",
1600    "greentic.pack.runtime_flows",
1601];
1602
1603#[derive(Debug, Deserialize)]
1604struct RuntimeFlowBundle {
1605    flows: Vec<RuntimeFlow>,
1606}
1607
1608#[derive(Debug, Deserialize)]
1609struct RuntimeFlow {
1610    id: String,
1611    #[serde(alias = "flow_type")]
1612    kind: FlowKind,
1613    #[serde(default)]
1614    schema_version: Option<String>,
1615    #[serde(default)]
1616    start: Option<String>,
1617    #[serde(default)]
1618    entrypoints: BTreeMap<String, Value>,
1619    nodes: BTreeMap<String, RuntimeNode>,
1620    #[serde(default)]
1621    metadata: Option<FlowMetadata>,
1622}
1623
1624#[derive(Debug, Deserialize)]
1625struct RuntimeNode {
1626    #[serde(alias = "component")]
1627    component_id: String,
1628    #[serde(default, alias = "operation")]
1629    operation_name: Option<String>,
1630    #[serde(default, alias = "payload", alias = "input")]
1631    operation_payload: Value,
1632    #[serde(default)]
1633    routing: Option<Routing>,
1634    #[serde(default)]
1635    telemetry: Option<TelemetryHints>,
1636}
1637
1638fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
1639    if bytes.is_empty() {
1640        return Ok(Value::Null);
1641    }
1642    serde_json::from_slice(&bytes).or_else(|_| {
1643        String::from_utf8(bytes)
1644            .map(Value::String)
1645            .map_err(|err| anyhow!(err))
1646    })
1647}
1648
1649impl PackFlows {
1650    fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
1651        if let Some(flows) = flows_from_runtime_extension(&manifest) {
1652            return flows;
1653        }
1654        let descriptors = manifest
1655            .flows
1656            .iter()
1657            .map(|entry| FlowDescriptor {
1658                id: entry.id.as_str().to_string(),
1659                flow_type: flow_kind_to_str(entry.kind).to_string(),
1660                profile: manifest.pack_id.as_str().to_string(),
1661                version: manifest.version.to_string(),
1662                description: None,
1663            })
1664            .collect();
1665        let mut flows = HashMap::new();
1666        for entry in &manifest.flows {
1667            flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
1668        }
1669        Self {
1670            metadata: PackMetadata::from_manifest(&manifest),
1671            descriptors,
1672            flows,
1673        }
1674    }
1675}
1676
1677fn flows_from_runtime_extension(manifest: &greentic_types::PackManifest) -> Option<PackFlows> {
1678    let extensions = manifest.extensions.as_ref()?;
1679    let extension = extensions.iter().find_map(|(key, ext)| {
1680        if RUNTIME_FLOW_EXTENSION_IDS
1681            .iter()
1682            .any(|candidate| candidate == key)
1683        {
1684            Some(ext)
1685        } else {
1686            None
1687        }
1688    })?;
1689    let runtime_flows = match decode_runtime_flow_extension(extension) {
1690        Some(flows) if !flows.is_empty() => flows,
1691        _ => return None,
1692    };
1693
1694    let descriptors = runtime_flows
1695        .iter()
1696        .map(|flow| FlowDescriptor {
1697            id: flow.id.as_str().to_string(),
1698            flow_type: flow_kind_to_str(flow.kind).to_string(),
1699            profile: manifest.pack_id.as_str().to_string(),
1700            version: manifest.version.to_string(),
1701            description: None,
1702        })
1703        .collect::<Vec<_>>();
1704    let flows = runtime_flows
1705        .into_iter()
1706        .map(|flow| (flow.id.as_str().to_string(), flow))
1707        .collect();
1708
1709    Some(PackFlows {
1710        metadata: PackMetadata::from_manifest(manifest),
1711        descriptors,
1712        flows,
1713    })
1714}
1715
1716fn decode_runtime_flow_extension(extension: &ExtensionRef) -> Option<Vec<Flow>> {
1717    let value = match extension.inline.as_ref()? {
1718        ExtensionInline::Other(value) => value.clone(),
1719        _ => return None,
1720    };
1721
1722    if let Ok(bundle) = serde_json::from_value::<RuntimeFlowBundle>(value.clone()) {
1723        return Some(collect_runtime_flows(bundle.flows));
1724    }
1725
1726    if let Ok(flows) = serde_json::from_value::<Vec<RuntimeFlow>>(value.clone()) {
1727        return Some(collect_runtime_flows(flows));
1728    }
1729
1730    if let Ok(flows) = serde_json::from_value::<Vec<Flow>>(value) {
1731        return Some(flows);
1732    }
1733
1734    warn!(
1735        extension = %extension.kind,
1736        version = %extension.version,
1737        "runtime flow extension present but could not be decoded"
1738    );
1739    None
1740}
1741
1742fn collect_runtime_flows(flows: Vec<RuntimeFlow>) -> Vec<Flow> {
1743    flows
1744        .into_iter()
1745        .filter_map(|flow| match runtime_flow_to_flow(flow) {
1746            Ok(flow) => Some(flow),
1747            Err(err) => {
1748                warn!(error = %err, "failed to decode runtime flow");
1749                None
1750            }
1751        })
1752        .collect()
1753}
1754
1755fn runtime_flow_to_flow(runtime: RuntimeFlow) -> Result<Flow> {
1756    let flow_id = FlowId::from_str(&runtime.id)
1757        .with_context(|| format!("invalid flow id `{}`", runtime.id))?;
1758    let mut entrypoints = runtime.entrypoints;
1759    if entrypoints.is_empty()
1760        && let Some(start) = &runtime.start
1761    {
1762        entrypoints.insert("default".into(), Value::String(start.clone()));
1763    }
1764
1765    let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
1766    for (id, node) in runtime.nodes {
1767        let node_id = NodeId::from_str(&id).with_context(|| format!("invalid node id `{id}`"))?;
1768        let component_id = ComponentId::from_str(&node.component_id)
1769            .with_context(|| format!("invalid component id `{}`", node.component_id))?;
1770        let component = FlowComponentRef {
1771            id: component_id,
1772            pack_alias: None,
1773            operation: node.operation_name,
1774        };
1775        let routing = node.routing.unwrap_or(Routing::End);
1776        let telemetry = node.telemetry.unwrap_or_default();
1777        nodes.insert(
1778            node_id.clone(),
1779            Node {
1780                id: node_id,
1781                component,
1782                input: InputMapping {
1783                    mapping: node.operation_payload,
1784                },
1785                output: OutputMapping {
1786                    mapping: Value::Null,
1787                },
1788                routing,
1789                telemetry,
1790            },
1791        );
1792    }
1793
1794    Ok(Flow {
1795        schema_version: runtime.schema_version.unwrap_or_else(|| "1.0".to_string()),
1796        id: flow_id,
1797        kind: runtime.kind,
1798        entrypoints,
1799        nodes,
1800        metadata: runtime.metadata.unwrap_or_default(),
1801    })
1802}
1803
1804fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
1805    match kind {
1806        greentic_types::FlowKind::Messaging => "messaging",
1807        greentic_types::FlowKind::Event => "event",
1808        greentic_types::FlowKind::ComponentConfig => "component-config",
1809        greentic_types::FlowKind::Job => "job",
1810        greentic_types::FlowKind::Http => "http",
1811    }
1812}
1813
1814fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1815    let mut file = archive
1816        .by_name(name)
1817        .with_context(|| format!("entry {name} missing from archive"))?;
1818    let mut buf = Vec::new();
1819    file.read_to_end(&mut buf)?;
1820    Ok(buf)
1821}
1822
1823fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
1824    for node in doc.nodes.values_mut() {
1825        let Some((component_ref, payload)) = node
1826            .raw
1827            .iter()
1828            .next()
1829            .map(|(key, value)| (key.clone(), value.clone()))
1830        else {
1831            continue;
1832        };
1833        if component_ref.starts_with("emit.") {
1834            node.operation = Some(component_ref);
1835            node.payload = payload;
1836            node.raw.clear();
1837            continue;
1838        }
1839        let (target_component, operation, input, config) =
1840            infer_component_exec(&payload, &component_ref);
1841        let mut payload_obj = serde_json::Map::new();
1842        // component.exec is meta; ensure the payload carries the actual target component.
1843        payload_obj.insert("component".into(), Value::String(target_component));
1844        payload_obj.insert("operation".into(), Value::String(operation));
1845        payload_obj.insert("input".into(), input);
1846        if let Some(cfg) = config {
1847            payload_obj.insert("config".into(), cfg);
1848        }
1849        node.operation = Some("component.exec".to_string());
1850        node.payload = Value::Object(payload_obj);
1851        node.raw.clear();
1852    }
1853    doc
1854}
1855
1856fn infer_component_exec(
1857    payload: &Value,
1858    component_ref: &str,
1859) -> (String, String, Value, Option<Value>) {
1860    let default_op = if component_ref.starts_with("templating.") {
1861        "render"
1862    } else {
1863        "invoke"
1864    }
1865    .to_string();
1866
1867    if let Value::Object(map) = payload {
1868        let op = map
1869            .get("op")
1870            .or_else(|| map.get("operation"))
1871            .and_then(Value::as_str)
1872            .map(|s| s.to_string())
1873            .unwrap_or_else(|| default_op.clone());
1874
1875        let mut input = map.clone();
1876        let config = input.remove("config");
1877        let component = input
1878            .get("component")
1879            .or_else(|| input.get("component_ref"))
1880            .and_then(Value::as_str)
1881            .map(|s| s.to_string())
1882            .unwrap_or_else(|| component_ref.to_string());
1883        input.remove("component");
1884        input.remove("component_ref");
1885        input.remove("op");
1886        input.remove("operation");
1887        return (component, op, Value::Object(input), config);
1888    }
1889
1890    (component_ref.to_string(), default_op, payload.clone(), None)
1891}
1892
1893#[derive(Clone, Debug)]
1894struct ComponentSpec {
1895    id: String,
1896    version: String,
1897    legacy_path: Option<String>,
1898}
1899
1900#[derive(Clone, Debug)]
1901struct ComponentSourceInfo {
1902    digest: String,
1903    source: ComponentSourceRef,
1904    artifact: ComponentArtifactLocation,
1905}
1906
1907#[derive(Clone, Debug)]
1908enum ComponentArtifactLocation {
1909    Inline { wasm_path: String },
1910    Remote,
1911}
1912
1913fn component_specs(
1914    manifest: Option<&greentic_types::PackManifest>,
1915    legacy_manifest: Option<&legacy_pack::PackManifest>,
1916    component_sources: Option<&ComponentSourcesV1>,
1917) -> Vec<ComponentSpec> {
1918    if let Some(manifest) = manifest {
1919        if !manifest.components.is_empty() {
1920            return manifest
1921                .components
1922                .iter()
1923                .map(|entry| ComponentSpec {
1924                    id: entry.id.as_str().to_string(),
1925                    version: entry.version.to_string(),
1926                    legacy_path: None,
1927                })
1928                .collect();
1929        }
1930        if let Some(sources) = component_sources {
1931            let mut seen = HashSet::new();
1932            let mut specs = Vec::new();
1933            for entry in &sources.components {
1934                let id = entry
1935                    .component_id
1936                    .as_ref()
1937                    .map(|id| id.as_str())
1938                    .unwrap_or(entry.name.as_str());
1939                if seen.insert(id.to_string()) {
1940                    specs.push(ComponentSpec {
1941                        id: id.to_string(),
1942                        version: "0.0.0".to_string(),
1943                        legacy_path: None,
1944                    });
1945                }
1946            }
1947            return specs;
1948        }
1949    }
1950    if let Some(legacy_manifest) = legacy_manifest {
1951        return legacy_manifest
1952            .components
1953            .iter()
1954            .map(|entry| ComponentSpec {
1955                id: entry.name.clone(),
1956                version: entry.version.to_string(),
1957                legacy_path: Some(entry.file_wasm.clone()),
1958            })
1959            .collect();
1960    }
1961    Vec::new()
1962}
1963
1964fn component_sources_table(
1965    sources: Option<&ComponentSourcesV1>,
1966) -> Result<Option<HashMap<String, ComponentSourceInfo>>> {
1967    let Some(sources) = sources else {
1968        return Ok(None);
1969    };
1970    let mut table = HashMap::new();
1971    for entry in &sources.components {
1972        let artifact = match &entry.artifact {
1973            ArtifactLocationV1::Inline { wasm_path, .. } => ComponentArtifactLocation::Inline {
1974                wasm_path: wasm_path.clone(),
1975            },
1976            ArtifactLocationV1::Remote => ComponentArtifactLocation::Remote,
1977        };
1978        let info = ComponentSourceInfo {
1979            digest: entry.resolved.digest.clone(),
1980            source: entry.source.clone(),
1981            artifact,
1982        };
1983        if let Some(component_id) = entry.component_id.as_ref() {
1984            table.insert(component_id.as_str().to_string(), info.clone());
1985        }
1986        table.insert(entry.name.clone(), info);
1987    }
1988    Ok(Some(table))
1989}
1990
1991fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
1992    if let Some(path) = &spec.legacy_path {
1993        return root.join(path);
1994    }
1995    root.join("components").join(format!("{}.wasm", spec.id))
1996}
1997
1998fn normalize_digest(digest: &str) -> String {
1999    if digest.starts_with("sha256:") || digest.starts_with("blake3:") {
2000        digest.to_string()
2001    } else {
2002        format!("sha256:{digest}")
2003    }
2004}
2005
2006fn compute_digest_for(bytes: &[u8], digest: &str) -> Result<String> {
2007    if digest.starts_with("blake3:") {
2008        let hash = blake3::hash(bytes);
2009        return Ok(format!("blake3:{}", hash.to_hex()));
2010    }
2011    let mut hasher = sha2::Sha256::new();
2012    hasher.update(bytes);
2013    Ok(format!("sha256:{:x}", hasher.finalize()))
2014}
2015
2016fn verify_component_digest(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2017    let normalized_expected = normalize_digest(expected);
2018    let actual = compute_digest_for(bytes, &normalized_expected)?;
2019    if normalize_digest(&actual) != normalized_expected {
2020        bail!(
2021            "component {component_id} digest mismatch: expected {normalized_expected}, got {actual}"
2022        );
2023    }
2024    Ok(())
2025}
2026
2027fn dist_options_from(component_resolution: &ComponentResolution) -> DistOptions {
2028    let mut opts = DistOptions {
2029        allow_tags: true,
2030        ..DistOptions::default()
2031    };
2032    if let Some(cache_dir) = component_resolution.dist_cache_dir.clone() {
2033        opts.cache_dir = cache_dir;
2034    }
2035    if component_resolution.dist_offline {
2036        opts.offline = true;
2037    }
2038    opts
2039}
2040
2041#[allow(clippy::too_many_arguments)]
2042async fn load_components_from_sources(
2043    engine: &Engine,
2044    component_sources: &HashMap<String, ComponentSourceInfo>,
2045    component_resolution: &ComponentResolution,
2046    specs: &[ComponentSpec],
2047    missing: &mut HashSet<String>,
2048    into: &mut HashMap<String, PackComponent>,
2049    materialized_root: Option<&Path>,
2050    archive_hint: Option<&Path>,
2051) -> Result<()> {
2052    let mut archive = if let Some(path) = archive_hint {
2053        Some(
2054            ZipArchive::new(File::open(path)?)
2055                .with_context(|| format!("{} is not a valid gtpack", path.display()))?,
2056        )
2057    } else {
2058        None
2059    };
2060    let mut dist_client: Option<DistClient> = None;
2061
2062    for spec in specs {
2063        if !missing.contains(&spec.id) {
2064            continue;
2065        }
2066        let Some(source) = component_sources.get(&spec.id) else {
2067            continue;
2068        };
2069
2070        let bytes = match &source.artifact {
2071            ComponentArtifactLocation::Inline { wasm_path } => {
2072                if let Some(root) = materialized_root {
2073                    let path = root.join(wasm_path);
2074                    if path.exists() {
2075                        std::fs::read(&path).with_context(|| {
2076                            format!(
2077                                "failed to read inline component {} from {}",
2078                                spec.id,
2079                                path.display()
2080                            )
2081                        })?
2082                    } else if archive.is_none() {
2083                        bail!("inline component {} missing at {}", spec.id, path.display());
2084                    } else {
2085                        read_entry(
2086                            archive.as_mut().expect("archive present when needed"),
2087                            wasm_path,
2088                        )
2089                        .with_context(|| {
2090                            format!(
2091                                "inline component {} missing at {} in pack archive",
2092                                spec.id, wasm_path
2093                            )
2094                        })?
2095                    }
2096                } else if let Some(archive) = archive.as_mut() {
2097                    read_entry(archive, wasm_path).with_context(|| {
2098                        format!(
2099                            "inline component {} missing at {} in pack archive",
2100                            spec.id, wasm_path
2101                        )
2102                    })?
2103                } else {
2104                    bail!(
2105                        "inline component {} missing and no pack source available",
2106                        spec.id
2107                    );
2108                }
2109            }
2110            ComponentArtifactLocation::Remote => {
2111                let client = dist_client.get_or_insert_with(|| {
2112                    DistClient::new(dist_options_from(component_resolution))
2113                });
2114                let reference = source.source.to_string();
2115                let cache_path = if component_resolution.dist_offline {
2116                    client
2117                        .fetch_digest(&source.digest)
2118                        .await
2119                        .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?
2120                } else {
2121                    let resolved = client
2122                        .resolve_ref(&reference)
2123                        .await
2124                        .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
2125                    let expected = normalize_digest(&source.digest);
2126                    let actual = normalize_digest(&resolved.digest);
2127                    if expected != actual {
2128                        bail!(
2129                            "component {} digest mismatch after fetch: expected {}, got {}",
2130                            spec.id,
2131                            expected,
2132                            actual
2133                        );
2134                    }
2135                    resolved.cache_path.ok_or_else(|| {
2136                        anyhow!(
2137                            "component {} resolved from {} but cache path is missing",
2138                            spec.id,
2139                            reference
2140                        )
2141                    })?
2142                };
2143                std::fs::read(&cache_path).with_context(|| {
2144                    format!(
2145                        "failed to read cached component {} from {}",
2146                        spec.id,
2147                        cache_path.display()
2148                    )
2149                })?
2150            }
2151        };
2152
2153        verify_component_digest(&spec.id, &source.digest, &bytes)?;
2154        let component = Component::from_binary(engine, &bytes)
2155            .with_context(|| format!("failed to compile component {}", spec.id))?;
2156        into.insert(
2157            spec.id.clone(),
2158            PackComponent {
2159                name: spec.id.clone(),
2160                version: spec.version.clone(),
2161                component,
2162            },
2163        );
2164        missing.remove(&spec.id);
2165    }
2166
2167    Ok(())
2168}
2169
2170fn dist_error_for_component(err: DistError, component_id: &str, reference: &str) -> anyhow::Error {
2171    match err {
2172        DistError::CacheMiss { reference: missing } => anyhow!(
2173            "remote component {} is not cached for {}. Run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2174            component_id,
2175            missing,
2176            reference
2177        ),
2178        DistError::Offline { reference: blocked } => anyhow!(
2179            "offline mode blocked fetching component {} from {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2180            component_id,
2181            blocked,
2182            reference
2183        ),
2184        DistError::AuthRequired { target } => anyhow!(
2185            "component {} requires authenticated source {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2186            component_id,
2187            target,
2188            reference
2189        ),
2190        other => anyhow!(
2191            "failed to resolve component {} from {}: {}",
2192            component_id,
2193            reference,
2194            other
2195        ),
2196    }
2197}
2198
2199fn load_components_from_overrides(
2200    engine: &Engine,
2201    overrides: &HashMap<String, PathBuf>,
2202    specs: &[ComponentSpec],
2203    missing: &mut HashSet<String>,
2204    into: &mut HashMap<String, PackComponent>,
2205) -> Result<()> {
2206    for spec in specs {
2207        if !missing.contains(&spec.id) {
2208            continue;
2209        }
2210        let Some(path) = overrides.get(&spec.id) else {
2211            continue;
2212        };
2213        let bytes = std::fs::read(path)
2214            .with_context(|| format!("failed to read override component {}", path.display()))?;
2215        let component = Component::from_binary(engine, &bytes).with_context(|| {
2216            format!(
2217                "failed to compile component {} from override {}",
2218                spec.id,
2219                path.display()
2220            )
2221        })?;
2222        into.insert(
2223            spec.id.clone(),
2224            PackComponent {
2225                name: spec.id.clone(),
2226                version: spec.version.clone(),
2227                component,
2228            },
2229        );
2230        missing.remove(&spec.id);
2231    }
2232    Ok(())
2233}
2234
2235fn load_components_from_dir(
2236    engine: &Engine,
2237    root: &Path,
2238    specs: &[ComponentSpec],
2239    missing: &mut HashSet<String>,
2240    into: &mut HashMap<String, PackComponent>,
2241) -> Result<()> {
2242    for spec in specs {
2243        if !missing.contains(&spec.id) {
2244            continue;
2245        }
2246        let path = component_path_for_spec(root, spec);
2247        if !path.exists() {
2248            tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
2249            continue;
2250        }
2251        let bytes = std::fs::read(&path)
2252            .with_context(|| format!("failed to read component {}", path.display()))?;
2253        let component = Component::from_binary(engine, &bytes).with_context(|| {
2254            format!(
2255                "failed to compile component {} from {}",
2256                spec.id,
2257                path.display()
2258            )
2259        })?;
2260        into.insert(
2261            spec.id.clone(),
2262            PackComponent {
2263                name: spec.id.clone(),
2264                version: spec.version.clone(),
2265                component,
2266            },
2267        );
2268        missing.remove(&spec.id);
2269    }
2270    Ok(())
2271}
2272
2273fn load_components_from_archive(
2274    engine: &Engine,
2275    path: &Path,
2276    specs: &[ComponentSpec],
2277    missing: &mut HashSet<String>,
2278    into: &mut HashMap<String, PackComponent>,
2279) -> Result<()> {
2280    let mut archive = ZipArchive::new(File::open(path)?)
2281        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
2282    for spec in specs {
2283        if !missing.contains(&spec.id) {
2284            continue;
2285        }
2286        let file_name = spec
2287            .legacy_path
2288            .clone()
2289            .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
2290        let bytes = match read_entry(&mut archive, &file_name) {
2291            Ok(bytes) => bytes,
2292            Err(err) => {
2293                warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
2294                continue;
2295            }
2296        };
2297        let component = Component::from_binary(engine, &bytes)
2298            .with_context(|| format!("failed to compile component {}", spec.id))?;
2299        into.insert(
2300            spec.id.clone(),
2301            PackComponent {
2302                name: spec.id.clone(),
2303                version: spec.version.clone(),
2304                component,
2305            },
2306        );
2307        missing.remove(&spec.id);
2308    }
2309    Ok(())
2310}
2311
2312#[cfg(test)]
2313mod tests {
2314    use super::*;
2315    use greentic_flow::model::{FlowDoc, NodeDoc};
2316    use indexmap::IndexMap;
2317    use serde_json::json;
2318
2319    #[test]
2320    fn normalizes_raw_component_to_component_exec() {
2321        let mut nodes = IndexMap::new();
2322        let mut raw = IndexMap::new();
2323        raw.insert(
2324            "templating.handlebars".into(),
2325            json!({ "template": "Hi {{name}}" }),
2326        );
2327        nodes.insert(
2328            "start".into(),
2329            NodeDoc {
2330                raw,
2331                routing: json!([{"out": true}]),
2332                ..Default::default()
2333            },
2334        );
2335        let doc = FlowDoc {
2336            id: "welcome".into(),
2337            title: None,
2338            description: None,
2339            flow_type: "messaging".into(),
2340            start: Some("start".into()),
2341            parameters: json!({}),
2342            tags: Vec::new(),
2343            schema_version: None,
2344            entrypoints: IndexMap::new(),
2345            nodes,
2346        };
2347
2348        let normalized = normalize_flow_doc(doc);
2349        let node = normalized.nodes.get("start").expect("node exists");
2350        assert_eq!(node.operation.as_deref(), Some("component.exec"));
2351        assert!(node.raw.is_empty());
2352        let payload = node.payload.as_object().expect("payload object");
2353        assert_eq!(
2354            payload.get("component"),
2355            Some(&Value::String("templating.handlebars".into()))
2356        );
2357        assert_eq!(
2358            payload.get("operation"),
2359            Some(&Value::String("render".into()))
2360        );
2361        let input = payload.get("input").unwrap();
2362        assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
2363    }
2364}
2365
2366#[derive(Clone, Debug, Default, Serialize, Deserialize)]
2367pub struct PackMetadata {
2368    pub pack_id: String,
2369    pub version: String,
2370    #[serde(default)]
2371    pub entry_flows: Vec<String>,
2372    #[serde(default)]
2373    pub secret_requirements: Vec<greentic_types::SecretRequirement>,
2374}
2375
2376impl PackMetadata {
2377    fn from_wasm(bytes: &[u8]) -> Option<Self> {
2378        let parser = Parser::new(0);
2379        for payload in parser.parse_all(bytes) {
2380            let payload = payload.ok()?;
2381            match payload {
2382                Payload::CustomSection(section) => {
2383                    if section.name() == "greentic.manifest"
2384                        && let Ok(meta) = Self::from_bytes(section.data())
2385                    {
2386                        return Some(meta);
2387                    }
2388                }
2389                Payload::DataSection(reader) => {
2390                    for segment in reader.into_iter().flatten() {
2391                        if let Ok(meta) = Self::from_bytes(segment.data) {
2392                            return Some(meta);
2393                        }
2394                    }
2395                }
2396                _ => {}
2397            }
2398        }
2399        None
2400    }
2401
2402    fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
2403        #[derive(Deserialize)]
2404        struct RawManifest {
2405            pack_id: String,
2406            version: String,
2407            #[serde(default)]
2408            entry_flows: Vec<String>,
2409            #[serde(default)]
2410            flows: Vec<RawFlow>,
2411            #[serde(default)]
2412            secret_requirements: Vec<greentic_types::SecretRequirement>,
2413        }
2414
2415        #[derive(Deserialize)]
2416        struct RawFlow {
2417            id: String,
2418        }
2419
2420        let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
2421        let mut entry_flows = if manifest.entry_flows.is_empty() {
2422            manifest.flows.iter().map(|f| f.id.clone()).collect()
2423        } else {
2424            manifest.entry_flows.clone()
2425        };
2426        entry_flows.retain(|id| !id.is_empty());
2427        Ok(Self {
2428            pack_id: manifest.pack_id,
2429            version: manifest.version,
2430            entry_flows,
2431            secret_requirements: manifest.secret_requirements,
2432        })
2433    }
2434
2435    pub fn fallback(path: &Path) -> Self {
2436        let pack_id = path
2437            .file_stem()
2438            .map(|s| s.to_string_lossy().into_owned())
2439            .unwrap_or_else(|| "unknown-pack".to_string());
2440        Self {
2441            pack_id,
2442            version: "0.0.0".to_string(),
2443            entry_flows: Vec::new(),
2444            secret_requirements: Vec::new(),
2445        }
2446    }
2447
2448    pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
2449        let entry_flows = manifest
2450            .flows
2451            .iter()
2452            .map(|flow| flow.id.as_str().to_string())
2453            .collect::<Vec<_>>();
2454        Self {
2455            pack_id: manifest.pack_id.as_str().to_string(),
2456            version: manifest.version.to_string(),
2457            entry_flows,
2458            secret_requirements: manifest.secret_requirements.clone(),
2459        }
2460    }
2461}