greentic_runner_host/
pack.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::component_api::component::greentic::component::control::Host as ComponentControlHost;
10use crate::component_api::{
11    ComponentPre, control, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::provider::{ProviderBinding, ProviderRegistry};
15use crate::provider_core::SchemaCorePre as ProviderComponentPre;
16use crate::provider_core_only;
17use crate::runtime_wasmtime::{Component, Engine, Linker, ResourceTable};
18use anyhow::{Context, Result, anyhow, bail};
19use greentic_distributor_client::dist::{DistClient, DistError, DistOptions};
20use greentic_interfaces_wasmtime::host_helpers::v1::{
21    self as host_v1, HostFns, add_all_v1_to_linker,
22    runner_host_http::RunnerHostHttp,
23    runner_host_kv::RunnerHostKv,
24    secrets_store::{SecretsError, SecretsStoreHost},
25    state_store::{
26        OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
27        StateStoreHost, TenantCtx as StateTenantCtx,
28    },
29    telemetry_logger::{
30        OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
31        TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
32        TenantCtx as TelemetryTenantCtx,
33    },
34};
35use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
36use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
37use greentic_pack::builder as legacy_pack;
38use greentic_types::flow::FlowHasher;
39use greentic_types::{
40    ArtifactLocationV1, ComponentId, ComponentSourceRef, EXT_COMPONENT_SOURCES_V1, EnvId,
41    ExtensionRef, Flow, FlowComponentRef, FlowId, FlowKind, FlowMetadata, InputMapping, Node,
42    NodeId, OutputMapping, Routing, StateKey as StoreStateKey, TeamId, TelemetryHints,
43    TenantCtx as TypesTenantCtx, TenantId, UserId, decode_pack_manifest,
44    pack_manifest::ExtensionInline,
45};
46use host_v1::http_client::{
47    HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
48    Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
49    RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
50    TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
51};
52use indexmap::IndexMap;
53use once_cell::sync::Lazy;
54use parking_lot::{Mutex, RwLock};
55use reqwest::blocking::Client as BlockingClient;
56use runner_core::normalize_under_root;
57use serde::{Deserialize, Serialize};
58use serde_cbor;
59use serde_json::{self, Value};
60use sha2::Digest;
61use tokio::fs;
62use wasmparser::{Parser, Payload};
63use wasmtime::StoreContextMut;
64use zip::ZipArchive;
65
66use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
67use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
68use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
69
70use crate::config::HostConfig;
71use crate::secrets::{DynSecretsManager, read_secret_blocking};
72use crate::storage::state::STATE_PREFIX;
73use crate::storage::{DynSessionStore, DynStateStore};
74use crate::verify;
75use crate::wasi::RunnerWasiPolicy;
76use tracing::warn;
77use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
78use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
79
80use greentic_flow::model::FlowDoc;
81
82#[allow(dead_code)]
83pub struct PackRuntime {
84    /// Component artifact path (wasm file).
85    path: PathBuf,
86    /// Optional archive (.gtpack) used to load flows/manifests.
87    archive_path: Option<PathBuf>,
88    config: Arc<HostConfig>,
89    engine: Engine,
90    metadata: PackMetadata,
91    manifest: Option<greentic_types::PackManifest>,
92    legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
93    mocks: Option<Arc<MockLayer>>,
94    flows: Option<PackFlows>,
95    components: HashMap<String, PackComponent>,
96    http_client: Arc<BlockingClient>,
97    pre_cache: Mutex<HashMap<String, ComponentPre<ComponentState>>>,
98    session_store: Option<DynSessionStore>,
99    state_store: Option<DynStateStore>,
100    wasi_policy: Arc<RunnerWasiPolicy>,
101    provider_registry: RwLock<Option<ProviderRegistry>>,
102    secrets: DynSecretsManager,
103    oauth_config: Option<OAuthBrokerConfig>,
104}
105
106struct PackComponent {
107    #[allow(dead_code)]
108    name: String,
109    #[allow(dead_code)]
110    version: String,
111    component: Component,
112}
113
114#[derive(Debug, Default, Clone)]
115pub struct ComponentResolution {
116    /// Root of a materialized pack directory containing `manifest.cbor` and `components/`.
117    pub materialized_root: Option<PathBuf>,
118    /// Explicit overrides mapping component id -> wasm path.
119    pub overrides: HashMap<String, PathBuf>,
120    /// If true, do not fetch remote components; require cached artifacts.
121    pub dist_offline: bool,
122    /// Optional cache directory for resolved remote components.
123    pub dist_cache_dir: Option<PathBuf>,
124}
125
126fn build_blocking_client() -> BlockingClient {
127    std::thread::spawn(|| {
128        BlockingClient::builder()
129            .no_proxy()
130            .build()
131            .expect("blocking client")
132    })
133    .join()
134    .expect("client build thread panicked")
135}
136
137fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
138    let (root, candidate) = if path.is_absolute() {
139        let parent = path
140            .parent()
141            .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
142        let root = parent
143            .canonicalize()
144            .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
145        let file = path
146            .file_name()
147            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
148        (root, PathBuf::from(file))
149    } else {
150        let cwd = std::env::current_dir().context("failed to resolve current directory")?;
151        let base = if let Some(parent) = path.parent() {
152            cwd.join(parent)
153        } else {
154            cwd
155        };
156        let root = base
157            .canonicalize()
158            .with_context(|| format!("failed to canonicalize {}", base.display()))?;
159        let file = path
160            .file_name()
161            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
162        (root, PathBuf::from(file))
163    };
164    let safe = normalize_under_root(&root, &candidate)?;
165    Ok((root, safe))
166}
167
168static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
169
170#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct FlowDescriptor {
172    pub id: String,
173    #[serde(rename = "type")]
174    pub flow_type: String,
175    pub profile: String,
176    pub version: String,
177    #[serde(default)]
178    pub description: Option<String>,
179}
180
181pub struct HostState {
182    config: Arc<HostConfig>,
183    http_client: Arc<BlockingClient>,
184    default_env: String,
185    #[allow(dead_code)]
186    session_store: Option<DynSessionStore>,
187    state_store: Option<DynStateStore>,
188    mocks: Option<Arc<MockLayer>>,
189    secrets: DynSecretsManager,
190    oauth_config: Option<OAuthBrokerConfig>,
191    oauth_host: OAuthBrokerHost,
192}
193
194impl HostState {
195    #[allow(clippy::default_constructed_unit_structs)]
196    pub fn new(
197        config: Arc<HostConfig>,
198        http_client: Arc<BlockingClient>,
199        mocks: Option<Arc<MockLayer>>,
200        session_store: Option<DynSessionStore>,
201        state_store: Option<DynStateStore>,
202        secrets: DynSecretsManager,
203        oauth_config: Option<OAuthBrokerConfig>,
204    ) -> Result<Self> {
205        let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
206        Ok(Self {
207            config,
208            http_client,
209            default_env,
210            session_store,
211            state_store,
212            mocks,
213            secrets,
214            oauth_config,
215            oauth_host: OAuthBrokerHost::default(),
216        })
217    }
218
219    pub fn get_secret(&self, key: &str) -> Result<String> {
220        if provider_core_only::is_enabled() {
221            bail!(provider_core_only::blocked_message("secrets"))
222        }
223        if !self.config.secrets_policy.is_allowed(key) {
224            bail!("secret {key} is not permitted by bindings policy");
225        }
226        if let Some(mock) = &self.mocks
227            && let Some(value) = mock.secrets_lookup(key)
228        {
229            return Ok(value);
230        }
231        let bytes = read_secret_blocking(&self.secrets, key)
232            .context("failed to read secret from manager")?;
233        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
234        Ok(value)
235    }
236
237    fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
238        let tenant_raw = ctx
239            .as_ref()
240            .map(|ctx| ctx.tenant.clone())
241            .unwrap_or_else(|| self.config.tenant.clone());
242        let env_raw = ctx
243            .as_ref()
244            .map(|ctx| ctx.env.clone())
245            .unwrap_or_else(|| self.default_env.clone());
246        let tenant_id = TenantId::from_str(&tenant_raw)
247            .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
248        let env_id = EnvId::from_str(&env_raw)
249            .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
250        let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
251        if let Some(ctx) = ctx {
252            if let Some(team) = ctx.team.or(ctx.team_id) {
253                let team_id =
254                    TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
255                tenant_ctx = tenant_ctx.with_team(Some(team_id));
256            }
257            if let Some(user) = ctx.user.or(ctx.user_id) {
258                let user_id =
259                    UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
260                tenant_ctx = tenant_ctx.with_user(Some(user_id));
261            }
262            if let Some(flow) = ctx.flow_id {
263                tenant_ctx = tenant_ctx.with_flow(flow);
264            }
265            if let Some(node) = ctx.node_id {
266                tenant_ctx = tenant_ctx.with_node(node);
267            }
268            if let Some(provider) = ctx.provider_id {
269                tenant_ctx = tenant_ctx.with_provider(provider);
270            }
271            if let Some(session) = ctx.session_id {
272                tenant_ctx = tenant_ctx.with_session(session);
273            }
274            tenant_ctx.trace_id = ctx.trace_id;
275        }
276        Ok(tenant_ctx)
277    }
278
279    fn send_http_request(
280        &mut self,
281        req: HttpRequest,
282        opts: Option<HttpRequestOptionsV1_1>,
283        _ctx: Option<HttpTenantCtx>,
284    ) -> Result<HttpResponse, HttpClientError> {
285        if !self.config.http_enabled {
286            return Err(HttpClientError {
287                code: "denied".into(),
288                message: "http client disabled by policy".into(),
289            });
290        }
291
292        let mut mock_state = None;
293        let raw_body = req.body.clone();
294        if let Some(mock) = &self.mocks
295            && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
296        {
297            match mock.http_begin(&meta) {
298                HttpDecision::Mock(response) => {
299                    let headers = response
300                        .headers
301                        .iter()
302                        .map(|(k, v)| (k.clone(), v.clone()))
303                        .collect();
304                    return Ok(HttpResponse {
305                        status: response.status,
306                        headers,
307                        body: response.body.clone().map(|b| b.into_bytes()),
308                    });
309                }
310                HttpDecision::Deny(reason) => {
311                    return Err(HttpClientError {
312                        code: "denied".into(),
313                        message: reason,
314                    });
315                }
316                HttpDecision::Passthrough { record } => {
317                    mock_state = Some((meta, record));
318                }
319            }
320        }
321
322        let method = req.method.parse().unwrap_or(reqwest::Method::GET);
323        let mut builder = self.http_client.request(method, &req.url);
324        for (key, value) in req.headers {
325            if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
326                && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
327            {
328                builder = builder.header(header, header_value);
329            }
330        }
331
332        if let Some(body) = raw_body.clone() {
333            builder = builder.body(body);
334        }
335
336        if let Some(opts) = opts {
337            if let Some(timeout_ms) = opts.timeout_ms {
338                builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
339            }
340            if opts.allow_insecure == Some(true) {
341                warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
342            }
343            if let Some(follow_redirects) = opts.follow_redirects
344                && !follow_redirects
345            {
346                warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
347            }
348        }
349
350        let response = match builder.send() {
351            Ok(resp) => resp,
352            Err(err) => {
353                warn!(url = %req.url, error = %err, "http client request failed");
354                return Err(HttpClientError {
355                    code: "unavailable".into(),
356                    message: err.to_string(),
357                });
358            }
359        };
360
361        let status = response.status().as_u16();
362        let headers_vec = response
363            .headers()
364            .iter()
365            .map(|(k, v)| {
366                (
367                    k.as_str().to_string(),
368                    v.to_str().unwrap_or_default().to_string(),
369                )
370            })
371            .collect::<Vec<_>>();
372        let body_bytes = response.bytes().ok().map(|b| b.to_vec());
373
374        if let Some((meta, true)) = mock_state.take()
375            && let Some(mock) = &self.mocks
376        {
377            let recorded = HttpMockResponse::new(
378                status,
379                headers_vec.clone().into_iter().collect(),
380                body_bytes
381                    .as_ref()
382                    .map(|b| String::from_utf8_lossy(b).into_owned()),
383            );
384            mock.http_record(&meta, &recorded);
385        }
386
387        Ok(HttpResponse {
388            status,
389            headers: headers_vec,
390            body: body_bytes,
391        })
392    }
393}
394
395impl SecretsStoreHost for HostState {
396    fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
397        if provider_core_only::is_enabled() {
398            warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
399            return Err(SecretsError::Denied);
400        }
401        if !self.config.secrets_policy.is_allowed(&key) {
402            return Err(SecretsError::Denied);
403        }
404        if let Some(mock) = &self.mocks
405            && let Some(value) = mock.secrets_lookup(&key)
406        {
407            return Ok(Some(value.into_bytes()));
408        }
409        match read_secret_blocking(&self.secrets, &key) {
410            Ok(bytes) => Ok(Some(bytes)),
411            Err(err) => {
412                warn!(secret = %key, error = %err, "secret lookup failed");
413                Err(SecretsError::NotFound)
414            }
415        }
416    }
417}
418
419impl HttpClientHost for HostState {
420    fn send(
421        &mut self,
422        req: HttpRequest,
423        ctx: Option<HttpTenantCtx>,
424    ) -> Result<HttpResponse, HttpClientError> {
425        self.send_http_request(req, None, ctx)
426    }
427}
428
429impl HttpClientHostV1_1 for HostState {
430    fn send(
431        &mut self,
432        req: HttpRequestV1_1,
433        opts: Option<HttpRequestOptionsV1_1>,
434        ctx: Option<HttpTenantCtxV1_1>,
435    ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
436        let legacy_req = HttpRequest {
437            method: req.method,
438            url: req.url,
439            headers: req.headers,
440            body: req.body,
441        };
442        let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
443            env: ctx.env,
444            tenant: ctx.tenant,
445            tenant_id: ctx.tenant_id,
446            team: ctx.team,
447            team_id: ctx.team_id,
448            user: ctx.user,
449            user_id: ctx.user_id,
450            trace_id: ctx.trace_id,
451            correlation_id: ctx.correlation_id,
452            attributes: ctx.attributes,
453            session_id: ctx.session_id,
454            flow_id: ctx.flow_id,
455            node_id: ctx.node_id,
456            provider_id: ctx.provider_id,
457            deadline_ms: ctx.deadline_ms,
458            attempt: ctx.attempt,
459            idempotency_key: ctx.idempotency_key,
460            impersonation: ctx
461                .impersonation
462                .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
463                    actor_id,
464                    reason,
465                }),
466        });
467
468        self.send_http_request(legacy_req, opts, legacy_ctx)
469            .map(|resp| HttpResponseV1_1 {
470                status: resp.status,
471                headers: resp.headers,
472                body: resp.body,
473            })
474            .map_err(|err| HttpClientErrorV1_1 {
475                code: err.code,
476                message: err.message,
477            })
478    }
479}
480
481impl StateStoreHost for HostState {
482    fn read(
483        &mut self,
484        key: HostStateKey,
485        ctx: Option<StateTenantCtx>,
486    ) -> Result<Vec<u8>, StateError> {
487        let store = match self.state_store.as_ref() {
488            Some(store) => store.clone(),
489            None => {
490                return Err(StateError {
491                    code: "unavailable".into(),
492                    message: "state store not configured".into(),
493                });
494            }
495        };
496        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
497            Ok(ctx) => ctx,
498            Err(err) => {
499                return Err(StateError {
500                    code: "invalid-ctx".into(),
501                    message: err.to_string(),
502                });
503            }
504        };
505        let key = StoreStateKey::from(key);
506        match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
507            Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
508            Ok(None) => Err(StateError {
509                code: "not_found".into(),
510                message: "state key not found".into(),
511            }),
512            Err(err) => Err(StateError {
513                code: "internal".into(),
514                message: err.to_string(),
515            }),
516        }
517    }
518
519    fn write(
520        &mut self,
521        key: HostStateKey,
522        bytes: Vec<u8>,
523        ctx: Option<StateTenantCtx>,
524    ) -> Result<StateOpAck, StateError> {
525        let store = match self.state_store.as_ref() {
526            Some(store) => store.clone(),
527            None => {
528                return Err(StateError {
529                    code: "unavailable".into(),
530                    message: "state store not configured".into(),
531                });
532            }
533        };
534        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
535            Ok(ctx) => ctx,
536            Err(err) => {
537                return Err(StateError {
538                    code: "invalid-ctx".into(),
539                    message: err.to_string(),
540                });
541            }
542        };
543        let key = StoreStateKey::from(key);
544        let value = serde_json::from_slice(&bytes)
545            .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
546        match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
547            Ok(()) => Ok(StateOpAck::Ok),
548            Err(err) => Err(StateError {
549                code: "internal".into(),
550                message: err.to_string(),
551            }),
552        }
553    }
554
555    fn delete(
556        &mut self,
557        key: HostStateKey,
558        ctx: Option<StateTenantCtx>,
559    ) -> Result<StateOpAck, StateError> {
560        let store = match self.state_store.as_ref() {
561            Some(store) => store.clone(),
562            None => {
563                return Err(StateError {
564                    code: "unavailable".into(),
565                    message: "state store not configured".into(),
566                });
567            }
568        };
569        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
570            Ok(ctx) => ctx,
571            Err(err) => {
572                return Err(StateError {
573                    code: "invalid-ctx".into(),
574                    message: err.to_string(),
575                });
576            }
577        };
578        let key = StoreStateKey::from(key);
579        match store.del(&tenant_ctx, STATE_PREFIX, &key) {
580            Ok(_) => Ok(StateOpAck::Ok),
581            Err(err) => Err(StateError {
582                code: "internal".into(),
583                message: err.to_string(),
584            }),
585        }
586    }
587}
588
589impl TelemetryLoggerHost for HostState {
590    fn log(
591        &mut self,
592        span: TelemetrySpanContext,
593        fields: Vec<(String, String)>,
594        _ctx: Option<TelemetryTenantCtx>,
595    ) -> Result<TelemetryAck, TelemetryError> {
596        if let Some(mock) = &self.mocks
597            && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
598        {
599            return Ok(TelemetryAck::Ok);
600        }
601        let mut map = serde_json::Map::new();
602        for (k, v) in fields {
603            map.insert(k, Value::String(v));
604        }
605        tracing::info!(
606            tenant = %span.tenant,
607            flow_id = %span.flow_id,
608            node = ?span.node_id,
609            provider = %span.provider,
610            fields = %serde_json::Value::Object(map.clone()),
611            "telemetry log from pack"
612        );
613        Ok(TelemetryAck::Ok)
614    }
615}
616
617impl RunnerHostHttp for HostState {
618    fn request(
619        &mut self,
620        method: String,
621        url: String,
622        headers: Vec<String>,
623        body: Option<Vec<u8>>,
624    ) -> Result<Vec<u8>, String> {
625        let req = HttpRequest {
626            method,
627            url,
628            headers: headers
629                .chunks(2)
630                .filter_map(|chunk| {
631                    if chunk.len() == 2 {
632                        Some((chunk[0].clone(), chunk[1].clone()))
633                    } else {
634                        None
635                    }
636                })
637                .collect(),
638            body,
639        };
640        match HttpClientHost::send(self, req, None) {
641            Ok(resp) => Ok(resp.body.unwrap_or_default()),
642            Err(err) => Err(err.message),
643        }
644    }
645}
646
647impl RunnerHostKv for HostState {
648    fn get(&mut self, _ns: String, _key: String) -> Option<String> {
649        None
650    }
651
652    fn put(&mut self, _ns: String, _key: String, _val: String) {}
653}
654
655enum ManifestLoad {
656    New {
657        manifest: Box<greentic_types::PackManifest>,
658        flows: PackFlows,
659    },
660    Legacy {
661        manifest: Box<legacy_pack::PackManifest>,
662        flows: PackFlows,
663    },
664}
665
666fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
667    let mut archive = ZipArchive::new(File::open(path)?)
668        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
669    let bytes = read_entry(&mut archive, "manifest.cbor")
670        .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
671    match decode_pack_manifest(&bytes) {
672        Ok(manifest) => {
673            let cache = PackFlows::from_manifest(manifest.clone());
674            Ok(ManifestLoad::New {
675                manifest: Box::new(manifest),
676                flows: cache,
677            })
678        }
679        Err(err) => {
680            tracing::debug!(error = %err, pack = %path.display(), "decode_pack_manifest failed; trying legacy manifest");
681            // Fall back to legacy pack manifest
682            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
683                .context("failed to decode legacy pack manifest from manifest.cbor")?;
684            let flows = load_legacy_flows(&mut archive, &legacy)?;
685            Ok(ManifestLoad::Legacy {
686                manifest: Box::new(legacy),
687                flows,
688            })
689        }
690    }
691}
692
693fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
694    let manifest_path = root.join("manifest.cbor");
695    let bytes = std::fs::read(&manifest_path)
696        .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
697    match decode_pack_manifest(&bytes) {
698        Ok(manifest) => {
699            let cache = PackFlows::from_manifest(manifest.clone());
700            Ok(ManifestLoad::New {
701                manifest: Box::new(manifest),
702                flows: cache,
703            })
704        }
705        Err(err) => {
706            tracing::debug!(
707                error = %err,
708                pack = %root.display(),
709                "decode_pack_manifest failed for materialized pack; trying legacy manifest"
710            );
711            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
712                .context("failed to decode legacy pack manifest from manifest.cbor")?;
713            let flows = load_legacy_flows_from_dir(root, &legacy)?;
714            Ok(ManifestLoad::Legacy {
715                manifest: Box::new(legacy),
716                flows,
717            })
718        }
719    }
720}
721
722fn load_legacy_flows(
723    archive: &mut ZipArchive<File>,
724    manifest: &legacy_pack::PackManifest,
725) -> Result<PackFlows> {
726    let mut flows = HashMap::new();
727    let mut descriptors = Vec::new();
728
729    for entry in &manifest.flows {
730        let bytes = read_entry(archive, &entry.file_json)
731            .with_context(|| format!("missing flow json {}", entry.file_json))?;
732        let doc: FlowDoc = serde_json::from_slice(&bytes)
733            .with_context(|| format!("failed to decode flow doc {}", entry.file_json))?;
734        let normalized = normalize_flow_doc(doc);
735        let flow_ir = flow_doc_to_ir(normalized)?;
736        let flow = flow_ir_to_flow(flow_ir)?;
737
738        descriptors.push(FlowDescriptor {
739            id: entry.id.clone(),
740            flow_type: entry.kind.clone(),
741            profile: manifest.meta.pack_id.clone(),
742            version: manifest.meta.version.to_string(),
743            description: None,
744        });
745        flows.insert(entry.id.clone(), flow);
746    }
747
748    let mut entry_flows = manifest.meta.entry_flows.clone();
749    if entry_flows.is_empty() {
750        entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
751    }
752    let metadata = PackMetadata {
753        pack_id: manifest.meta.pack_id.clone(),
754        version: manifest.meta.version.to_string(),
755        entry_flows,
756        secret_requirements: Vec::new(),
757    };
758
759    Ok(PackFlows {
760        descriptors,
761        flows,
762        metadata,
763    })
764}
765
766fn load_legacy_flows_from_dir(
767    root: &Path,
768    manifest: &legacy_pack::PackManifest,
769) -> Result<PackFlows> {
770    let mut flows = HashMap::new();
771    let mut descriptors = Vec::new();
772
773    for entry in &manifest.flows {
774        let path = root.join(&entry.file_json);
775        let bytes = std::fs::read(&path)
776            .with_context(|| format!("missing flow json {}", path.display()))?;
777        let doc: FlowDoc = serde_json::from_slice(&bytes)
778            .with_context(|| format!("failed to decode flow doc {}", path.display()))?;
779        let normalized = normalize_flow_doc(doc);
780        let flow_ir = flow_doc_to_ir(normalized)?;
781        let flow = flow_ir_to_flow(flow_ir)?;
782
783        descriptors.push(FlowDescriptor {
784            id: entry.id.clone(),
785            flow_type: entry.kind.clone(),
786            profile: manifest.meta.pack_id.clone(),
787            version: manifest.meta.version.to_string(),
788            description: None,
789        });
790        flows.insert(entry.id.clone(), flow);
791    }
792
793    let mut entry_flows = manifest.meta.entry_flows.clone();
794    if entry_flows.is_empty() {
795        entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
796    }
797    let metadata = PackMetadata {
798        pack_id: manifest.meta.pack_id.clone(),
799        version: manifest.meta.version.to_string(),
800        entry_flows,
801        secret_requirements: Vec::new(),
802    };
803
804    Ok(PackFlows {
805        descriptors,
806        flows,
807        metadata,
808    })
809}
810
811pub struct ComponentState {
812    pub host: HostState,
813    wasi_ctx: WasiCtx,
814    resource_table: ResourceTable,
815}
816
817impl ComponentState {
818    pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
819        let wasi_ctx = policy
820            .instantiate()
821            .context("failed to build WASI context")?;
822        Ok(Self {
823            host,
824            wasi_ctx,
825            resource_table: ResourceTable::new(),
826        })
827    }
828
829    fn host_mut(&mut self) -> &mut HostState {
830        &mut self.host
831    }
832}
833
834impl control::Host for ComponentState {
835    fn should_cancel(&mut self) -> bool {
836        false
837    }
838
839    fn yield_now(&mut self) {
840        // no-op cooperative yield
841    }
842}
843
844fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
845    let mut inst = linker.instance("greentic:component/control@0.4.0")?;
846    inst.func_wrap(
847        "should-cancel",
848        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
849            let host = caller.data_mut();
850            Ok((ComponentControlHost::should_cancel(host),))
851        },
852    )?;
853    inst.func_wrap(
854        "yield-now",
855        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
856            let host = caller.data_mut();
857            ComponentControlHost::yield_now(host);
858            Ok(())
859        },
860    )?;
861    Ok(())
862}
863
864pub fn register_all(linker: &mut Linker<ComponentState>) -> Result<()> {
865    add_wasi_to_linker(linker)?;
866    add_all_v1_to_linker(
867        linker,
868        HostFns {
869            http_client_v1_1: Some(|state| state.host_mut()),
870            http_client: Some(|state| state.host_mut()),
871            oauth_broker: None,
872            runner_host_http: Some(|state| state.host_mut()),
873            runner_host_kv: Some(|state| state.host_mut()),
874            telemetry_logger: Some(|state| state.host_mut()),
875            state_store: Some(|state| state.host_mut()),
876            secrets_store: Some(|state| state.host_mut()),
877        },
878    )?;
879    Ok(())
880}
881
882impl OAuthHostContext for ComponentState {
883    fn tenant_id(&self) -> &str {
884        &self.host.config.tenant
885    }
886
887    fn env(&self) -> &str {
888        &self.host.default_env
889    }
890
891    fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
892        &mut self.host.oauth_host
893    }
894
895    fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
896        self.host.oauth_config.as_ref()
897    }
898}
899
900impl WasiView for ComponentState {
901    fn ctx(&mut self) -> WasiCtxView<'_> {
902        WasiCtxView {
903            ctx: &mut self.wasi_ctx,
904            table: &mut self.resource_table,
905        }
906    }
907}
908
909#[allow(unsafe_code)]
910unsafe impl Send for ComponentState {}
911#[allow(unsafe_code)]
912unsafe impl Sync for ComponentState {}
913
914impl PackRuntime {
915    #[allow(clippy::too_many_arguments)]
916    pub async fn load(
917        path: impl AsRef<Path>,
918        config: Arc<HostConfig>,
919        mocks: Option<Arc<MockLayer>>,
920        archive_source: Option<&Path>,
921        session_store: Option<DynSessionStore>,
922        state_store: Option<DynStateStore>,
923        wasi_policy: Arc<RunnerWasiPolicy>,
924        secrets: DynSecretsManager,
925        oauth_config: Option<OAuthBrokerConfig>,
926        verify_archive: bool,
927        component_resolution: ComponentResolution,
928    ) -> Result<Self> {
929        let path = path.as_ref();
930        let (_pack_root, safe_path) = normalize_pack_path(path)?;
931        let path_meta = std::fs::metadata(&safe_path).ok();
932        let is_dir = path_meta
933            .as_ref()
934            .map(|meta| meta.is_dir())
935            .unwrap_or(false);
936        let is_component = !is_dir
937            && safe_path
938                .extension()
939                .and_then(|ext| ext.to_str())
940                .map(|ext| ext.eq_ignore_ascii_case("wasm"))
941                .unwrap_or(false);
942        let archive_hint_path = if let Some(source) = archive_source {
943            let (_, normalized) = normalize_pack_path(source)?;
944            Some(normalized)
945        } else if is_component || is_dir {
946            None
947        } else {
948            Some(safe_path.clone())
949        };
950        let archive_hint = archive_hint_path.as_deref();
951        if verify_archive {
952            if let Some(verify_target) = archive_hint.and_then(|p| {
953                std::fs::metadata(p)
954                    .ok()
955                    .filter(|meta| meta.is_file())
956                    .map(|_| p)
957            }) {
958                verify::verify_pack(verify_target).await?;
959                tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
960            } else {
961                tracing::debug!("skipping archive verification (no archive source)");
962            }
963        }
964        let engine = Engine::default();
965        let mut metadata = PackMetadata::fallback(&safe_path);
966        let mut manifest = None;
967        let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
968        let mut flows = None;
969        let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
970            if is_dir {
971                Some(safe_path.clone())
972            } else {
973                None
974            }
975        });
976
977        if let Some(root) = materialized_root.as_ref() {
978            match load_manifest_and_flows_from_dir(root) {
979                Ok(ManifestLoad::New {
980                    manifest: m,
981                    flows: cache,
982                }) => {
983                    metadata = cache.metadata.clone();
984                    manifest = Some(*m);
985                    flows = Some(cache);
986                }
987                Ok(ManifestLoad::Legacy {
988                    manifest: m,
989                    flows: cache,
990                }) => {
991                    metadata = cache.metadata.clone();
992                    legacy_manifest = Some(m);
993                    flows = Some(cache);
994                }
995                Err(err) => {
996                    warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
997                }
998            }
999        }
1000
1001        if manifest.is_none()
1002            && legacy_manifest.is_none()
1003            && let Some(archive_path) = archive_hint
1004        {
1005            match load_manifest_and_flows(archive_path) {
1006                Ok(ManifestLoad::New {
1007                    manifest: m,
1008                    flows: cache,
1009                }) => {
1010                    metadata = cache.metadata.clone();
1011                    manifest = Some(*m);
1012                    flows = Some(cache);
1013                }
1014                Ok(ManifestLoad::Legacy {
1015                    manifest: m,
1016                    flows: cache,
1017                }) => {
1018                    metadata = cache.metadata.clone();
1019                    legacy_manifest = Some(m);
1020                    flows = Some(cache);
1021                }
1022                Err(err) => {
1023                    warn!(error = %err, pack = %archive_path.display(), "failed to parse pack manifest; skipping flows");
1024                }
1025            }
1026        }
1027        let component_sources = if let Some(manifest) = manifest.as_ref() {
1028            component_sources_table(manifest).context("invalid component sources extension")?
1029        } else {
1030            None
1031        };
1032        let components = if is_component {
1033            let wasm_bytes = fs::read(&safe_path).await?;
1034            metadata = PackMetadata::from_wasm(&wasm_bytes)
1035                .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1036            let name = safe_path
1037                .file_stem()
1038                .map(|s| s.to_string_lossy().to_string())
1039                .unwrap_or_else(|| "component".to_string());
1040            let component = Component::from_binary(&engine, &wasm_bytes)?;
1041            let mut map = HashMap::new();
1042            map.insert(
1043                name.clone(),
1044                PackComponent {
1045                    name,
1046                    version: metadata.version.clone(),
1047                    component,
1048                },
1049            );
1050            map
1051        } else {
1052            let specs = component_specs(manifest.as_ref(), legacy_manifest.as_deref());
1053            if specs.is_empty() {
1054                HashMap::new()
1055            } else {
1056                let mut loaded = HashMap::new();
1057                let mut missing: HashSet<String> =
1058                    specs.iter().map(|spec| spec.id.clone()).collect();
1059                let mut searched = Vec::new();
1060
1061                if !component_resolution.overrides.is_empty() {
1062                    load_components_from_overrides(
1063                        &engine,
1064                        &component_resolution.overrides,
1065                        &specs,
1066                        &mut missing,
1067                        &mut loaded,
1068                    )?;
1069                    searched.push("override map".to_string());
1070                }
1071
1072                if let Some(component_sources) = component_sources.as_ref() {
1073                    load_components_from_sources(
1074                        &engine,
1075                        component_sources,
1076                        &component_resolution,
1077                        &specs,
1078                        &mut missing,
1079                        &mut loaded,
1080                        materialized_root.as_deref(),
1081                        archive_hint,
1082                    )
1083                    .await?;
1084                    searched.push(format!("extension {}", EXT_COMPONENT_SOURCES_V1));
1085                }
1086
1087                if let Some(root) = materialized_root.as_ref() {
1088                    load_components_from_dir(&engine, root, &specs, &mut missing, &mut loaded)?;
1089                    searched.push(format!("components dir {}", root.display()));
1090                }
1091
1092                if let Some(archive_path) = archive_hint {
1093                    load_components_from_archive(
1094                        &engine,
1095                        archive_path,
1096                        &specs,
1097                        &mut missing,
1098                        &mut loaded,
1099                    )?;
1100                    searched.push(format!("archive {}", archive_path.display()));
1101                }
1102
1103                if !missing.is_empty() {
1104                    let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
1105                    let sources = if searched.is_empty() {
1106                        "no component sources".to_string()
1107                    } else {
1108                        searched.join(", ")
1109                    };
1110                    bail!(
1111                        "components missing: {}; looked in {}",
1112                        missing_list,
1113                        sources
1114                    );
1115                }
1116
1117                loaded
1118            }
1119        };
1120        let http_client = Arc::clone(&HTTP_CLIENT);
1121        Ok(Self {
1122            path: safe_path,
1123            archive_path: archive_hint.map(Path::to_path_buf),
1124            config,
1125            engine,
1126            metadata,
1127            manifest,
1128            legacy_manifest,
1129            mocks,
1130            flows,
1131            components,
1132            http_client,
1133            pre_cache: Mutex::new(HashMap::new()),
1134            session_store,
1135            state_store,
1136            wasi_policy,
1137            provider_registry: RwLock::new(None),
1138            secrets,
1139            oauth_config,
1140        })
1141    }
1142
1143    pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
1144        if let Some(cache) = &self.flows {
1145            return Ok(cache.descriptors.clone());
1146        }
1147        if let Some(manifest) = &self.manifest {
1148            let descriptors = manifest
1149                .flows
1150                .iter()
1151                .map(|flow| FlowDescriptor {
1152                    id: flow.id.as_str().to_string(),
1153                    flow_type: flow_kind_to_str(flow.kind).to_string(),
1154                    profile: manifest.pack_id.as_str().to_string(),
1155                    version: manifest.version.to_string(),
1156                    description: None,
1157                })
1158                .collect();
1159            return Ok(descriptors);
1160        }
1161        Ok(Vec::new())
1162    }
1163
1164    #[allow(dead_code)]
1165    pub async fn run_flow(
1166        &self,
1167        flow_id: &str,
1168        input: serde_json::Value,
1169    ) -> Result<serde_json::Value> {
1170        let pack = Arc::new(
1171            PackRuntime::load(
1172                &self.path,
1173                Arc::clone(&self.config),
1174                self.mocks.clone(),
1175                self.archive_path.as_deref(),
1176                self.session_store.clone(),
1177                self.state_store.clone(),
1178                Arc::clone(&self.wasi_policy),
1179                self.secrets.clone(),
1180                self.oauth_config.clone(),
1181                false,
1182                ComponentResolution::default(),
1183            )
1184            .await?,
1185        );
1186
1187        let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1188        let retry_config = self.config.retry_config().into();
1189        let mocks = pack.mocks.as_deref();
1190        let tenant = self.config.tenant.as_str();
1191
1192        let ctx = FlowContext {
1193            tenant,
1194            flow_id,
1195            node_id: None,
1196            tool: None,
1197            action: None,
1198            session_id: None,
1199            provider_id: None,
1200            retry_config,
1201            observer: None,
1202            mocks,
1203        };
1204
1205        let execution = engine.execute(ctx, input).await?;
1206        match execution.status {
1207            FlowStatus::Completed => Ok(execution.output),
1208            FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1209                "status": "pending",
1210                "reason": wait.reason,
1211                "resume": wait.snapshot,
1212                "response": execution.output,
1213            })),
1214        }
1215    }
1216
1217    pub async fn invoke_component(
1218        &self,
1219        component_ref: &str,
1220        ctx: ComponentExecCtx,
1221        operation: &str,
1222        _config_json: Option<String>,
1223        input_json: String,
1224    ) -> Result<Value> {
1225        let pack_component = self
1226            .components
1227            .get(component_ref)
1228            .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1229
1230        let mut linker = Linker::new(&self.engine);
1231        register_all(&mut linker)?;
1232        add_component_control_to_linker(&mut linker)?;
1233        let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1234        let pre: ComponentPre<ComponentState> = ComponentPre::new(pre_instance)?;
1235
1236        let host_state = HostState::new(
1237            Arc::clone(&self.config),
1238            Arc::clone(&self.http_client),
1239            self.mocks.clone(),
1240            self.session_store.clone(),
1241            self.state_store.clone(),
1242            Arc::clone(&self.secrets),
1243            self.oauth_config.clone(),
1244        )?;
1245        let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1246        let mut store = wasmtime::Store::new(&self.engine, store_state);
1247        let bindings: crate::component_api::Component = pre.instantiate_async(&mut store).await?;
1248        let node = bindings.greentic_component_node();
1249
1250        let result = node.call_invoke(&mut store, &ctx, operation, &input_json)?;
1251
1252        match result {
1253            InvokeResult::Ok(body) => {
1254                if body.is_empty() {
1255                    return Ok(Value::Null);
1256                }
1257                serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
1258            }
1259            InvokeResult::Err(NodeError {
1260                code,
1261                message,
1262                retryable,
1263                backoff_ms,
1264                details,
1265            }) => {
1266                let mut obj = serde_json::Map::new();
1267                obj.insert("ok".into(), Value::Bool(false));
1268                let mut error = serde_json::Map::new();
1269                error.insert("code".into(), Value::String(code));
1270                error.insert("message".into(), Value::String(message));
1271                error.insert("retryable".into(), Value::Bool(retryable));
1272                if let Some(backoff) = backoff_ms {
1273                    error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1274                }
1275                if let Some(details) = details {
1276                    error.insert(
1277                        "details".into(),
1278                        serde_json::from_str(&details).unwrap_or(Value::String(details)),
1279                    );
1280                }
1281                obj.insert("error".into(), Value::Object(error));
1282                Ok(Value::Object(obj))
1283            }
1284        }
1285    }
1286
1287    pub fn resolve_provider(
1288        &self,
1289        provider_id: Option<&str>,
1290        provider_type: Option<&str>,
1291    ) -> Result<ProviderBinding> {
1292        let registry = self.provider_registry()?;
1293        registry.resolve(provider_id, provider_type)
1294    }
1295
1296    pub async fn invoke_provider(
1297        &self,
1298        binding: &ProviderBinding,
1299        _ctx: ComponentExecCtx,
1300        op: &str,
1301        input_json: Vec<u8>,
1302    ) -> Result<Value> {
1303        let component_ref = &binding.component_ref;
1304        let pack_component = self
1305            .components
1306            .get(component_ref)
1307            .with_context(|| format!("provider component '{component_ref}' not found in pack"))?;
1308
1309        let mut linker = Linker::new(&self.engine);
1310        register_all(&mut linker)?;
1311        add_component_control_to_linker(&mut linker)?;
1312        let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1313        let pre: ProviderComponentPre<ComponentState> = ProviderComponentPre::new(pre_instance)?;
1314
1315        let host_state = HostState::new(
1316            Arc::clone(&self.config),
1317            Arc::clone(&self.http_client),
1318            self.mocks.clone(),
1319            self.session_store.clone(),
1320            self.state_store.clone(),
1321            Arc::clone(&self.secrets),
1322            self.oauth_config.clone(),
1323        )?;
1324        let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1325        let mut store = wasmtime::Store::new(&self.engine, store_state);
1326        let bindings: crate::provider_core::SchemaCore = pre.instantiate_async(&mut store).await?;
1327        let provider = bindings.greentic_provider_core_schema_core_api();
1328
1329        let result = provider.call_invoke(&mut store, op, &input_json)?;
1330        deserialize_json_bytes(result)
1331    }
1332
1333    fn provider_registry(&self) -> Result<ProviderRegistry> {
1334        if let Some(registry) = self.provider_registry.read().clone() {
1335            return Ok(registry);
1336        }
1337        let manifest = self
1338            .manifest
1339            .as_ref()
1340            .context("pack manifest required for provider resolution")?;
1341        let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1342        let registry = ProviderRegistry::new(
1343            manifest,
1344            self.state_store.clone(),
1345            &self.config.tenant,
1346            &env,
1347        )?;
1348        *self.provider_registry.write() = Some(registry.clone());
1349        Ok(registry)
1350    }
1351
1352    pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1353        if let Some(cache) = &self.flows {
1354            return cache
1355                .flows
1356                .get(flow_id)
1357                .cloned()
1358                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1359        }
1360        if let Some(manifest) = &self.manifest {
1361            let entry = manifest
1362                .flows
1363                .iter()
1364                .find(|f| f.id.as_str() == flow_id)
1365                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1366            return Ok(entry.flow.clone());
1367        }
1368        bail!("flow '{flow_id}' not available (pack exports disabled)")
1369    }
1370
1371    pub fn metadata(&self) -> &PackMetadata {
1372        &self.metadata
1373    }
1374
1375    pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1376        &self.metadata.secret_requirements
1377    }
1378
1379    pub fn missing_secrets(
1380        &self,
1381        tenant_ctx: &TypesTenantCtx,
1382    ) -> Vec<greentic_types::SecretRequirement> {
1383        let env = tenant_ctx.env.as_str().to_string();
1384        let tenant = tenant_ctx.tenant.as_str().to_string();
1385        let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1386        self.required_secrets()
1387            .iter()
1388            .filter(|req| {
1389                // scope must match current context if provided
1390                if let Some(scope) = &req.scope {
1391                    if scope.env != env {
1392                        return false;
1393                    }
1394                    if scope.tenant != tenant {
1395                        return false;
1396                    }
1397                    if let Some(ref team_req) = scope.team
1398                        && team.as_ref() != Some(team_req)
1399                    {
1400                        return false;
1401                    }
1402                }
1403                read_secret_blocking(&self.secrets, req.key.as_str()).is_err()
1404            })
1405            .cloned()
1406            .collect()
1407    }
1408
1409    pub fn for_component_test(
1410        components: Vec<(String, PathBuf)>,
1411        flows: HashMap<String, FlowIR>,
1412        config: Arc<HostConfig>,
1413    ) -> Result<Self> {
1414        let engine = Engine::default();
1415        let mut component_map = HashMap::new();
1416        for (name, path) in components {
1417            if !path.exists() {
1418                bail!("component artifact missing: {}", path.display());
1419            }
1420            let wasm_bytes = std::fs::read(&path)?;
1421            let component = Component::from_binary(&engine, &wasm_bytes)
1422                .with_context(|| format!("failed to compile component {}", path.display()))?;
1423            component_map.insert(
1424                name.clone(),
1425                PackComponent {
1426                    name,
1427                    version: "0.0.0".into(),
1428                    component,
1429                },
1430            );
1431        }
1432
1433        let mut flow_map = HashMap::new();
1434        let mut descriptors = Vec::new();
1435        for (id, ir) in flows {
1436            let flow_type = ir.flow_type.clone();
1437            let flow = flow_ir_to_flow(ir)?;
1438            flow_map.insert(id.clone(), flow);
1439            descriptors.push(FlowDescriptor {
1440                id: id.clone(),
1441                flow_type,
1442                profile: "test".into(),
1443                version: "0.0.0".into(),
1444                description: None,
1445            });
1446        }
1447        let flows_cache = PackFlows {
1448            descriptors: descriptors.clone(),
1449            flows: flow_map,
1450            metadata: PackMetadata::fallback(Path::new("component-test")),
1451        };
1452
1453        Ok(Self {
1454            path: PathBuf::new(),
1455            archive_path: None,
1456            config,
1457            engine,
1458            metadata: PackMetadata::fallback(Path::new("component-test")),
1459            manifest: None,
1460            legacy_manifest: None,
1461            mocks: None,
1462            flows: Some(flows_cache),
1463            components: component_map,
1464            http_client: Arc::clone(&HTTP_CLIENT),
1465            pre_cache: Mutex::new(HashMap::new()),
1466            session_store: None,
1467            state_store: None,
1468            wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1469            provider_registry: RwLock::new(None),
1470            secrets: crate::secrets::default_manager(),
1471            oauth_config: None,
1472        })
1473    }
1474}
1475
1476struct PackFlows {
1477    descriptors: Vec<FlowDescriptor>,
1478    flows: HashMap<String, Flow>,
1479    metadata: PackMetadata,
1480}
1481
1482const RUNTIME_FLOW_EXTENSION_IDS: [&str; 3] = [
1483    "greentic.pack.runtime_flow",
1484    "greentic.pack.flow_runtime",
1485    "greentic.pack.runtime_flows",
1486];
1487
1488#[derive(Debug, Deserialize)]
1489struct RuntimeFlowBundle {
1490    flows: Vec<RuntimeFlow>,
1491}
1492
1493#[derive(Debug, Deserialize)]
1494struct RuntimeFlow {
1495    id: String,
1496    #[serde(alias = "flow_type")]
1497    kind: FlowKind,
1498    #[serde(default)]
1499    schema_version: Option<String>,
1500    #[serde(default)]
1501    start: Option<String>,
1502    #[serde(default)]
1503    entrypoints: BTreeMap<String, Value>,
1504    nodes: BTreeMap<String, RuntimeNode>,
1505    #[serde(default)]
1506    metadata: Option<FlowMetadata>,
1507}
1508
1509#[derive(Debug, Deserialize)]
1510struct RuntimeNode {
1511    #[serde(alias = "component")]
1512    component_id: String,
1513    #[serde(default, alias = "operation")]
1514    operation_name: Option<String>,
1515    #[serde(default, alias = "payload", alias = "input")]
1516    operation_payload: Value,
1517    #[serde(default)]
1518    routing: Option<Routing>,
1519    #[serde(default)]
1520    telemetry: Option<TelemetryHints>,
1521}
1522
1523fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
1524    if bytes.is_empty() {
1525        return Ok(Value::Null);
1526    }
1527    serde_json::from_slice(&bytes).or_else(|_| {
1528        String::from_utf8(bytes)
1529            .map(Value::String)
1530            .map_err(|err| anyhow!(err))
1531    })
1532}
1533
1534impl PackFlows {
1535    fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
1536        if let Some(flows) = flows_from_runtime_extension(&manifest) {
1537            return flows;
1538        }
1539        let descriptors = manifest
1540            .flows
1541            .iter()
1542            .map(|entry| FlowDescriptor {
1543                id: entry.id.as_str().to_string(),
1544                flow_type: flow_kind_to_str(entry.kind).to_string(),
1545                profile: manifest.pack_id.as_str().to_string(),
1546                version: manifest.version.to_string(),
1547                description: None,
1548            })
1549            .collect();
1550        let mut flows = HashMap::new();
1551        for entry in &manifest.flows {
1552            flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
1553        }
1554        Self {
1555            metadata: PackMetadata::from_manifest(&manifest),
1556            descriptors,
1557            flows,
1558        }
1559    }
1560}
1561
1562fn flows_from_runtime_extension(manifest: &greentic_types::PackManifest) -> Option<PackFlows> {
1563    let extensions = manifest.extensions.as_ref()?;
1564    let extension = extensions.iter().find_map(|(key, ext)| {
1565        if RUNTIME_FLOW_EXTENSION_IDS
1566            .iter()
1567            .any(|candidate| candidate == key)
1568        {
1569            Some(ext)
1570        } else {
1571            None
1572        }
1573    })?;
1574    let runtime_flows = match decode_runtime_flow_extension(extension) {
1575        Some(flows) if !flows.is_empty() => flows,
1576        _ => return None,
1577    };
1578
1579    let descriptors = runtime_flows
1580        .iter()
1581        .map(|flow| FlowDescriptor {
1582            id: flow.id.as_str().to_string(),
1583            flow_type: flow_kind_to_str(flow.kind).to_string(),
1584            profile: manifest.pack_id.as_str().to_string(),
1585            version: manifest.version.to_string(),
1586            description: None,
1587        })
1588        .collect::<Vec<_>>();
1589    let flows = runtime_flows
1590        .into_iter()
1591        .map(|flow| (flow.id.as_str().to_string(), flow))
1592        .collect();
1593
1594    Some(PackFlows {
1595        metadata: PackMetadata::from_manifest(manifest),
1596        descriptors,
1597        flows,
1598    })
1599}
1600
1601fn decode_runtime_flow_extension(extension: &ExtensionRef) -> Option<Vec<Flow>> {
1602    let value = match extension.inline.as_ref()? {
1603        ExtensionInline::Other(value) => value.clone(),
1604        _ => return None,
1605    };
1606
1607    if let Ok(bundle) = serde_json::from_value::<RuntimeFlowBundle>(value.clone()) {
1608        return Some(collect_runtime_flows(bundle.flows));
1609    }
1610
1611    if let Ok(flows) = serde_json::from_value::<Vec<RuntimeFlow>>(value.clone()) {
1612        return Some(collect_runtime_flows(flows));
1613    }
1614
1615    if let Ok(flows) = serde_json::from_value::<Vec<Flow>>(value) {
1616        return Some(flows);
1617    }
1618
1619    warn!(
1620        extension = %extension.kind,
1621        version = %extension.version,
1622        "runtime flow extension present but could not be decoded"
1623    );
1624    None
1625}
1626
1627fn collect_runtime_flows(flows: Vec<RuntimeFlow>) -> Vec<Flow> {
1628    flows
1629        .into_iter()
1630        .filter_map(|flow| match runtime_flow_to_flow(flow) {
1631            Ok(flow) => Some(flow),
1632            Err(err) => {
1633                warn!(error = %err, "failed to decode runtime flow");
1634                None
1635            }
1636        })
1637        .collect()
1638}
1639
1640fn runtime_flow_to_flow(runtime: RuntimeFlow) -> Result<Flow> {
1641    let flow_id = FlowId::from_str(&runtime.id)
1642        .with_context(|| format!("invalid flow id `{}`", runtime.id))?;
1643    let mut entrypoints = runtime.entrypoints;
1644    if entrypoints.is_empty()
1645        && let Some(start) = &runtime.start
1646    {
1647        entrypoints.insert("default".into(), Value::String(start.clone()));
1648    }
1649
1650    let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
1651    for (id, node) in runtime.nodes {
1652        let node_id = NodeId::from_str(&id).with_context(|| format!("invalid node id `{id}`"))?;
1653        let component_id = ComponentId::from_str(&node.component_id)
1654            .with_context(|| format!("invalid component id `{}`", node.component_id))?;
1655        let component = FlowComponentRef {
1656            id: component_id,
1657            pack_alias: None,
1658            operation: node.operation_name,
1659        };
1660        let routing = node.routing.unwrap_or(Routing::End);
1661        let telemetry = node.telemetry.unwrap_or_default();
1662        nodes.insert(
1663            node_id.clone(),
1664            Node {
1665                id: node_id,
1666                component,
1667                input: InputMapping {
1668                    mapping: node.operation_payload,
1669                },
1670                output: OutputMapping {
1671                    mapping: Value::Null,
1672                },
1673                routing,
1674                telemetry,
1675            },
1676        );
1677    }
1678
1679    Ok(Flow {
1680        schema_version: runtime.schema_version.unwrap_or_else(|| "1.0".to_string()),
1681        id: flow_id,
1682        kind: runtime.kind,
1683        entrypoints,
1684        nodes,
1685        metadata: runtime.metadata.unwrap_or_default(),
1686    })
1687}
1688
1689fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
1690    match kind {
1691        greentic_types::FlowKind::Messaging => "messaging",
1692        greentic_types::FlowKind::Event => "event",
1693        greentic_types::FlowKind::ComponentConfig => "component-config",
1694        greentic_types::FlowKind::Job => "job",
1695        greentic_types::FlowKind::Http => "http",
1696    }
1697}
1698
1699fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1700    let mut file = archive
1701        .by_name(name)
1702        .with_context(|| format!("entry {name} missing from archive"))?;
1703    let mut buf = Vec::new();
1704    file.read_to_end(&mut buf)?;
1705    Ok(buf)
1706}
1707
1708fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
1709    for node in doc.nodes.values_mut() {
1710        let Some((component_ref, payload)) = node
1711            .raw
1712            .iter()
1713            .next()
1714            .map(|(key, value)| (key.clone(), value.clone()))
1715        else {
1716            continue;
1717        };
1718        if component_ref.starts_with("emit.") {
1719            node.operation = Some(component_ref);
1720            node.payload = payload;
1721            node.raw.clear();
1722            continue;
1723        }
1724        let (target_component, operation, input, config) =
1725            infer_component_exec(&payload, &component_ref);
1726        let mut payload_obj = serde_json::Map::new();
1727        // component.exec is meta; ensure the payload carries the actual target component.
1728        payload_obj.insert("component".into(), Value::String(target_component));
1729        payload_obj.insert("operation".into(), Value::String(operation));
1730        payload_obj.insert("input".into(), input);
1731        if let Some(cfg) = config {
1732            payload_obj.insert("config".into(), cfg);
1733        }
1734        node.operation = Some("component.exec".to_string());
1735        node.payload = Value::Object(payload_obj);
1736        node.raw.clear();
1737    }
1738    doc
1739}
1740
1741fn infer_component_exec(
1742    payload: &Value,
1743    component_ref: &str,
1744) -> (String, String, Value, Option<Value>) {
1745    let default_op = if component_ref.starts_with("templating.") {
1746        "render"
1747    } else {
1748        "invoke"
1749    }
1750    .to_string();
1751
1752    if let Value::Object(map) = payload {
1753        let op = map
1754            .get("op")
1755            .or_else(|| map.get("operation"))
1756            .and_then(Value::as_str)
1757            .map(|s| s.to_string())
1758            .unwrap_or_else(|| default_op.clone());
1759
1760        let mut input = map.clone();
1761        let config = input.remove("config");
1762        let component = input
1763            .get("component")
1764            .or_else(|| input.get("component_ref"))
1765            .and_then(Value::as_str)
1766            .map(|s| s.to_string())
1767            .unwrap_or_else(|| component_ref.to_string());
1768        input.remove("component");
1769        input.remove("component_ref");
1770        input.remove("op");
1771        input.remove("operation");
1772        return (component, op, Value::Object(input), config);
1773    }
1774
1775    (component_ref.to_string(), default_op, payload.clone(), None)
1776}
1777
1778#[derive(Clone, Debug)]
1779struct ComponentSpec {
1780    id: String,
1781    version: String,
1782    legacy_path: Option<String>,
1783}
1784
1785#[derive(Clone, Debug)]
1786struct ComponentSourceInfo {
1787    digest: String,
1788    source: ComponentSourceRef,
1789    artifact: ComponentArtifactLocation,
1790}
1791
1792#[derive(Clone, Debug)]
1793enum ComponentArtifactLocation {
1794    Inline { wasm_path: String },
1795    Remote,
1796}
1797
1798fn component_specs(
1799    manifest: Option<&greentic_types::PackManifest>,
1800    legacy_manifest: Option<&legacy_pack::PackManifest>,
1801) -> Vec<ComponentSpec> {
1802    if let Some(manifest) = manifest {
1803        return manifest
1804            .components
1805            .iter()
1806            .map(|entry| ComponentSpec {
1807                id: entry.id.as_str().to_string(),
1808                version: entry.version.to_string(),
1809                legacy_path: None,
1810            })
1811            .collect();
1812    }
1813    if let Some(legacy_manifest) = legacy_manifest {
1814        return legacy_manifest
1815            .components
1816            .iter()
1817            .map(|entry| ComponentSpec {
1818                id: entry.name.clone(),
1819                version: entry.version.to_string(),
1820                legacy_path: Some(entry.file_wasm.clone()),
1821            })
1822            .collect();
1823    }
1824    Vec::new()
1825}
1826
1827fn component_sources_table(
1828    manifest: &greentic_types::PackManifest,
1829) -> Result<Option<HashMap<String, ComponentSourceInfo>>> {
1830    let Some(sources) = manifest.get_component_sources_v1()? else {
1831        return Ok(None);
1832    };
1833    let mut table = HashMap::new();
1834    for entry in sources.components {
1835        let artifact = match entry.artifact {
1836            ArtifactLocationV1::Inline { wasm_path, .. } => {
1837                ComponentArtifactLocation::Inline { wasm_path }
1838            }
1839            ArtifactLocationV1::Remote => ComponentArtifactLocation::Remote,
1840        };
1841        let info = ComponentSourceInfo {
1842            digest: entry.resolved.digest,
1843            source: entry.source,
1844            artifact,
1845        };
1846        if let Some(component_id) = entry.component_id.as_ref() {
1847            table.insert(component_id.as_str().to_string(), info.clone());
1848        }
1849        table.insert(entry.name, info);
1850    }
1851    Ok(Some(table))
1852}
1853
1854fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
1855    if let Some(path) = &spec.legacy_path {
1856        return root.join(path);
1857    }
1858    root.join("components").join(format!("{}.wasm", spec.id))
1859}
1860
1861fn normalize_digest(digest: &str) -> String {
1862    if digest.starts_with("sha256:") || digest.starts_with("blake3:") {
1863        digest.to_string()
1864    } else {
1865        format!("sha256:{digest}")
1866    }
1867}
1868
1869fn compute_digest_for(bytes: &[u8], digest: &str) -> Result<String> {
1870    if digest.starts_with("blake3:") {
1871        let hash = blake3::hash(bytes);
1872        return Ok(format!("blake3:{}", hash.to_hex()));
1873    }
1874    let mut hasher = sha2::Sha256::new();
1875    hasher.update(bytes);
1876    Ok(format!("sha256:{:x}", hasher.finalize()))
1877}
1878
1879fn verify_component_digest(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
1880    let normalized_expected = normalize_digest(expected);
1881    let actual = compute_digest_for(bytes, &normalized_expected)?;
1882    if normalize_digest(&actual) != normalized_expected {
1883        bail!(
1884            "component {component_id} digest mismatch: expected {normalized_expected}, got {actual}"
1885        );
1886    }
1887    Ok(())
1888}
1889
1890fn dist_options_from(component_resolution: &ComponentResolution) -> DistOptions {
1891    let mut opts = DistOptions {
1892        allow_tags: true,
1893        ..DistOptions::default()
1894    };
1895    if let Some(cache_dir) = component_resolution.dist_cache_dir.clone() {
1896        opts.cache_dir = cache_dir;
1897    }
1898    if component_resolution.dist_offline {
1899        opts.offline = true;
1900    }
1901    opts
1902}
1903
1904#[allow(clippy::too_many_arguments)]
1905async fn load_components_from_sources(
1906    engine: &Engine,
1907    component_sources: &HashMap<String, ComponentSourceInfo>,
1908    component_resolution: &ComponentResolution,
1909    specs: &[ComponentSpec],
1910    missing: &mut HashSet<String>,
1911    into: &mut HashMap<String, PackComponent>,
1912    materialized_root: Option<&Path>,
1913    archive_hint: Option<&Path>,
1914) -> Result<()> {
1915    let mut archive = if let Some(path) = archive_hint {
1916        Some(
1917            ZipArchive::new(File::open(path)?)
1918                .with_context(|| format!("{} is not a valid gtpack", path.display()))?,
1919        )
1920    } else {
1921        None
1922    };
1923    let mut dist_client: Option<DistClient> = None;
1924
1925    for spec in specs {
1926        if !missing.contains(&spec.id) {
1927            continue;
1928        }
1929        let Some(source) = component_sources.get(&spec.id) else {
1930            continue;
1931        };
1932
1933        let bytes = match &source.artifact {
1934            ComponentArtifactLocation::Inline { wasm_path } => {
1935                if let Some(root) = materialized_root {
1936                    let path = root.join(wasm_path);
1937                    if path.exists() {
1938                        std::fs::read(&path).with_context(|| {
1939                            format!(
1940                                "failed to read inline component {} from {}",
1941                                spec.id,
1942                                path.display()
1943                            )
1944                        })?
1945                    } else if archive.is_none() {
1946                        bail!("inline component {} missing at {}", spec.id, path.display());
1947                    } else {
1948                        read_entry(
1949                            archive.as_mut().expect("archive present when needed"),
1950                            wasm_path,
1951                        )
1952                        .with_context(|| {
1953                            format!(
1954                                "inline component {} missing at {} in pack archive",
1955                                spec.id, wasm_path
1956                            )
1957                        })?
1958                    }
1959                } else if let Some(archive) = archive.as_mut() {
1960                    read_entry(archive, wasm_path).with_context(|| {
1961                        format!(
1962                            "inline component {} missing at {} in pack archive",
1963                            spec.id, wasm_path
1964                        )
1965                    })?
1966                } else {
1967                    bail!(
1968                        "inline component {} missing and no pack source available",
1969                        spec.id
1970                    );
1971                }
1972            }
1973            ComponentArtifactLocation::Remote => {
1974                let client = dist_client.get_or_insert_with(|| {
1975                    DistClient::new(dist_options_from(component_resolution))
1976                });
1977                let reference = source.source.to_string();
1978                let cache_path = if component_resolution.dist_offline {
1979                    client
1980                        .fetch_digest(&source.digest)
1981                        .await
1982                        .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?
1983                } else {
1984                    let resolved = client
1985                        .resolve_ref(&reference)
1986                        .await
1987                        .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
1988                    let expected = normalize_digest(&source.digest);
1989                    let actual = normalize_digest(&resolved.digest);
1990                    if expected != actual {
1991                        bail!(
1992                            "component {} digest mismatch after fetch: expected {}, got {}",
1993                            spec.id,
1994                            expected,
1995                            actual
1996                        );
1997                    }
1998                    resolved.cache_path.ok_or_else(|| {
1999                        anyhow!(
2000                            "component {} resolved from {} but cache path is missing",
2001                            spec.id,
2002                            reference
2003                        )
2004                    })?
2005                };
2006                std::fs::read(&cache_path).with_context(|| {
2007                    format!(
2008                        "failed to read cached component {} from {}",
2009                        spec.id,
2010                        cache_path.display()
2011                    )
2012                })?
2013            }
2014        };
2015
2016        verify_component_digest(&spec.id, &source.digest, &bytes)?;
2017        let component = Component::from_binary(engine, &bytes)
2018            .with_context(|| format!("failed to compile component {}", spec.id))?;
2019        into.insert(
2020            spec.id.clone(),
2021            PackComponent {
2022                name: spec.id.clone(),
2023                version: spec.version.clone(),
2024                component,
2025            },
2026        );
2027        missing.remove(&spec.id);
2028    }
2029
2030    Ok(())
2031}
2032
2033fn dist_error_for_component(err: DistError, component_id: &str, reference: &str) -> anyhow::Error {
2034    match err {
2035        DistError::CacheMiss { reference: missing } => anyhow!(
2036            "remote component {} is not cached for {}. Run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2037            component_id,
2038            missing,
2039            reference
2040        ),
2041        DistError::Offline { reference: blocked } => anyhow!(
2042            "offline mode blocked fetching component {} from {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2043            component_id,
2044            blocked,
2045            reference
2046        ),
2047        DistError::AuthRequired { target } => anyhow!(
2048            "component {} requires authenticated source {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2049            component_id,
2050            target,
2051            reference
2052        ),
2053        other => anyhow!(
2054            "failed to resolve component {} from {}: {}",
2055            component_id,
2056            reference,
2057            other
2058        ),
2059    }
2060}
2061
2062fn load_components_from_overrides(
2063    engine: &Engine,
2064    overrides: &HashMap<String, PathBuf>,
2065    specs: &[ComponentSpec],
2066    missing: &mut HashSet<String>,
2067    into: &mut HashMap<String, PackComponent>,
2068) -> Result<()> {
2069    for spec in specs {
2070        if !missing.contains(&spec.id) {
2071            continue;
2072        }
2073        let Some(path) = overrides.get(&spec.id) else {
2074            continue;
2075        };
2076        let bytes = std::fs::read(path)
2077            .with_context(|| format!("failed to read override component {}", path.display()))?;
2078        let component = Component::from_binary(engine, &bytes).with_context(|| {
2079            format!(
2080                "failed to compile component {} from override {}",
2081                spec.id,
2082                path.display()
2083            )
2084        })?;
2085        into.insert(
2086            spec.id.clone(),
2087            PackComponent {
2088                name: spec.id.clone(),
2089                version: spec.version.clone(),
2090                component,
2091            },
2092        );
2093        missing.remove(&spec.id);
2094    }
2095    Ok(())
2096}
2097
2098fn load_components_from_dir(
2099    engine: &Engine,
2100    root: &Path,
2101    specs: &[ComponentSpec],
2102    missing: &mut HashSet<String>,
2103    into: &mut HashMap<String, PackComponent>,
2104) -> Result<()> {
2105    for spec in specs {
2106        if !missing.contains(&spec.id) {
2107            continue;
2108        }
2109        let path = component_path_for_spec(root, spec);
2110        if !path.exists() {
2111            tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
2112            continue;
2113        }
2114        let bytes = std::fs::read(&path)
2115            .with_context(|| format!("failed to read component {}", path.display()))?;
2116        let component = Component::from_binary(engine, &bytes).with_context(|| {
2117            format!(
2118                "failed to compile component {} from {}",
2119                spec.id,
2120                path.display()
2121            )
2122        })?;
2123        into.insert(
2124            spec.id.clone(),
2125            PackComponent {
2126                name: spec.id.clone(),
2127                version: spec.version.clone(),
2128                component,
2129            },
2130        );
2131        missing.remove(&spec.id);
2132    }
2133    Ok(())
2134}
2135
2136fn load_components_from_archive(
2137    engine: &Engine,
2138    path: &Path,
2139    specs: &[ComponentSpec],
2140    missing: &mut HashSet<String>,
2141    into: &mut HashMap<String, PackComponent>,
2142) -> Result<()> {
2143    let mut archive = ZipArchive::new(File::open(path)?)
2144        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
2145    for spec in specs {
2146        if !missing.contains(&spec.id) {
2147            continue;
2148        }
2149        let file_name = spec
2150            .legacy_path
2151            .clone()
2152            .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
2153        let bytes = match read_entry(&mut archive, &file_name) {
2154            Ok(bytes) => bytes,
2155            Err(err) => {
2156                warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
2157                continue;
2158            }
2159        };
2160        let component = Component::from_binary(engine, &bytes)
2161            .with_context(|| format!("failed to compile component {}", spec.id))?;
2162        into.insert(
2163            spec.id.clone(),
2164            PackComponent {
2165                name: spec.id.clone(),
2166                version: spec.version.clone(),
2167                component,
2168            },
2169        );
2170        missing.remove(&spec.id);
2171    }
2172    Ok(())
2173}
2174
2175#[cfg(test)]
2176mod tests {
2177    use super::*;
2178    use greentic_flow::model::{FlowDoc, NodeDoc};
2179    use serde_json::json;
2180    use std::collections::BTreeMap;
2181
2182    #[test]
2183    fn normalizes_raw_component_to_component_exec() {
2184        let mut nodes = BTreeMap::new();
2185        let mut raw = BTreeMap::new();
2186        raw.insert(
2187            "templating.handlebars".into(),
2188            json!({ "template": "Hi {{name}}" }),
2189        );
2190        nodes.insert(
2191            "start".into(),
2192            NodeDoc {
2193                raw,
2194                routing: json!([{"out": true}]),
2195                ..Default::default()
2196            },
2197        );
2198        let doc = FlowDoc {
2199            id: "welcome".into(),
2200            title: None,
2201            description: None,
2202            flow_type: "messaging".into(),
2203            start: Some("start".into()),
2204            parameters: json!({}),
2205            tags: Vec::new(),
2206            schema_version: None,
2207            entrypoints: BTreeMap::new(),
2208            nodes,
2209        };
2210
2211        let normalized = normalize_flow_doc(doc);
2212        let node = normalized.nodes.get("start").expect("node exists");
2213        assert_eq!(node.operation.as_deref(), Some("component.exec"));
2214        assert!(node.raw.is_empty());
2215        let payload = node.payload.as_object().expect("payload object");
2216        assert_eq!(
2217            payload.get("component"),
2218            Some(&Value::String("templating.handlebars".into()))
2219        );
2220        assert_eq!(
2221            payload.get("operation"),
2222            Some(&Value::String("render".into()))
2223        );
2224        let input = payload.get("input").unwrap();
2225        assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
2226    }
2227}
2228
2229#[derive(Clone, Debug, Default, Serialize, Deserialize)]
2230pub struct PackMetadata {
2231    pub pack_id: String,
2232    pub version: String,
2233    #[serde(default)]
2234    pub entry_flows: Vec<String>,
2235    #[serde(default)]
2236    pub secret_requirements: Vec<greentic_types::SecretRequirement>,
2237}
2238
2239impl PackMetadata {
2240    fn from_wasm(bytes: &[u8]) -> Option<Self> {
2241        let parser = Parser::new(0);
2242        for payload in parser.parse_all(bytes) {
2243            let payload = payload.ok()?;
2244            match payload {
2245                Payload::CustomSection(section) => {
2246                    if section.name() == "greentic.manifest"
2247                        && let Ok(meta) = Self::from_bytes(section.data())
2248                    {
2249                        return Some(meta);
2250                    }
2251                }
2252                Payload::DataSection(reader) => {
2253                    for segment in reader.into_iter().flatten() {
2254                        if let Ok(meta) = Self::from_bytes(segment.data) {
2255                            return Some(meta);
2256                        }
2257                    }
2258                }
2259                _ => {}
2260            }
2261        }
2262        None
2263    }
2264
2265    fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
2266        #[derive(Deserialize)]
2267        struct RawManifest {
2268            pack_id: String,
2269            version: String,
2270            #[serde(default)]
2271            entry_flows: Vec<String>,
2272            #[serde(default)]
2273            flows: Vec<RawFlow>,
2274            #[serde(default)]
2275            secret_requirements: Vec<greentic_types::SecretRequirement>,
2276        }
2277
2278        #[derive(Deserialize)]
2279        struct RawFlow {
2280            id: String,
2281        }
2282
2283        let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
2284        let mut entry_flows = if manifest.entry_flows.is_empty() {
2285            manifest.flows.iter().map(|f| f.id.clone()).collect()
2286        } else {
2287            manifest.entry_flows.clone()
2288        };
2289        entry_flows.retain(|id| !id.is_empty());
2290        Ok(Self {
2291            pack_id: manifest.pack_id,
2292            version: manifest.version,
2293            entry_flows,
2294            secret_requirements: manifest.secret_requirements,
2295        })
2296    }
2297
2298    pub fn fallback(path: &Path) -> Self {
2299        let pack_id = path
2300            .file_stem()
2301            .map(|s| s.to_string_lossy().into_owned())
2302            .unwrap_or_else(|| "unknown-pack".to_string());
2303        Self {
2304            pack_id,
2305            version: "0.0.0".to_string(),
2306            entry_flows: Vec::new(),
2307            secret_requirements: Vec::new(),
2308        }
2309    }
2310
2311    pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
2312        let entry_flows = manifest
2313            .flows
2314            .iter()
2315            .map(|flow| flow.id.as_str().to_string())
2316            .collect::<Vec<_>>();
2317        Self {
2318            pack_id: manifest.pack_id.as_str().to_string(),
2319            version: manifest.version.to_string(),
2320            entry_flows,
2321            secret_requirements: manifest.secret_requirements.clone(),
2322        }
2323    }
2324}