Skip to main content

greentic_runner_host/
pack.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::component_api::{
10    self, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
11};
12use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
13use crate::provider::{ProviderBinding, ProviderRegistry};
14use crate::provider_core::SchemaCorePre as ProviderComponentPre;
15use crate::provider_core_only;
16use crate::runtime_wasmtime::{Component, Engine, InstancePre, Linker, ResourceTable};
17use anyhow::{Context, Result, anyhow, bail};
18use greentic_distributor_client::dist::{DistClient, DistError, DistOptions};
19use greentic_interfaces_wasmtime::host_helpers::v1::{
20    self as host_v1, HostFns, add_all_v1_to_linker,
21    runner_host_http::RunnerHostHttp,
22    runner_host_kv::RunnerHostKv,
23    secrets_store::{SecretsError, SecretsStoreHost},
24    state_store::{
25        OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
26        StateStoreHost, TenantCtx as StateTenantCtx,
27    },
28    telemetry_logger::{
29        OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
30        TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
31        TenantCtx as TelemetryTenantCtx,
32    },
33};
34use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
35use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
36use greentic_pack::builder as legacy_pack;
37use greentic_types::flow::FlowHasher;
38use greentic_types::{
39    ArtifactLocationV1, ComponentId, ComponentSourceRef, EXT_COMPONENT_SOURCES_V1, EnvId,
40    ExtensionRef, Flow, FlowComponentRef, FlowId, FlowKind, FlowMetadata, InputMapping, Node,
41    NodeId, OutputMapping, Routing, StateKey as StoreStateKey, TeamId, TelemetryHints,
42    TenantCtx as TypesTenantCtx, TenantId, UserId, decode_pack_manifest,
43    pack_manifest::ExtensionInline,
44};
45use host_v1::http_client::{
46    HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
47    Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
48    RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
49    TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
50};
51use indexmap::IndexMap;
52use once_cell::sync::Lazy;
53use parking_lot::{Mutex, RwLock};
54use reqwest::blocking::Client as BlockingClient;
55use runner_core::normalize_under_root;
56use serde::{Deserialize, Serialize};
57use serde_cbor;
58use serde_json::{self, Value};
59use sha2::Digest;
60use tokio::fs;
61use wasmparser::{Parser, Payload};
62use wasmtime::StoreContextMut;
63use zip::ZipArchive;
64
65use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
66use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
67use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
68
69use crate::config::HostConfig;
70use crate::secrets::{DynSecretsManager, read_secret_blocking};
71use crate::storage::state::STATE_PREFIX;
72use crate::storage::{DynSessionStore, DynStateStore};
73use crate::verify;
74use crate::wasi::RunnerWasiPolicy;
75use tracing::warn;
76use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
77use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
78
79use greentic_flow::model::FlowDoc;
80
81#[allow(dead_code)]
82pub struct PackRuntime {
83    /// Component artifact path (wasm file).
84    path: PathBuf,
85    /// Optional archive (.gtpack) used to load flows/manifests.
86    archive_path: Option<PathBuf>,
87    config: Arc<HostConfig>,
88    engine: Engine,
89    metadata: PackMetadata,
90    manifest: Option<greentic_types::PackManifest>,
91    legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
92    mocks: Option<Arc<MockLayer>>,
93    flows: Option<PackFlows>,
94    components: HashMap<String, PackComponent>,
95    http_client: Arc<BlockingClient>,
96    pre_cache: Mutex<HashMap<String, InstancePre<ComponentState>>>,
97    session_store: Option<DynSessionStore>,
98    state_store: Option<DynStateStore>,
99    wasi_policy: Arc<RunnerWasiPolicy>,
100    provider_registry: RwLock<Option<ProviderRegistry>>,
101    secrets: DynSecretsManager,
102    oauth_config: Option<OAuthBrokerConfig>,
103}
104
105struct PackComponent {
106    #[allow(dead_code)]
107    name: String,
108    #[allow(dead_code)]
109    version: String,
110    component: Component,
111}
112
113#[derive(Debug, Default, Clone)]
114pub struct ComponentResolution {
115    /// Root of a materialized pack directory containing `manifest.cbor` and `components/`.
116    pub materialized_root: Option<PathBuf>,
117    /// Explicit overrides mapping component id -> wasm path.
118    pub overrides: HashMap<String, PathBuf>,
119    /// If true, do not fetch remote components; require cached artifacts.
120    pub dist_offline: bool,
121    /// Optional cache directory for resolved remote components.
122    pub dist_cache_dir: Option<PathBuf>,
123}
124
125fn build_blocking_client() -> BlockingClient {
126    std::thread::spawn(|| {
127        BlockingClient::builder()
128            .no_proxy()
129            .build()
130            .expect("blocking client")
131    })
132    .join()
133    .expect("client build thread panicked")
134}
135
136fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
137    let (root, candidate) = if path.is_absolute() {
138        let parent = path
139            .parent()
140            .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
141        let root = parent
142            .canonicalize()
143            .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
144        let file = path
145            .file_name()
146            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
147        (root, PathBuf::from(file))
148    } else {
149        let cwd = std::env::current_dir().context("failed to resolve current directory")?;
150        let base = if let Some(parent) = path.parent() {
151            cwd.join(parent)
152        } else {
153            cwd
154        };
155        let root = base
156            .canonicalize()
157            .with_context(|| format!("failed to canonicalize {}", base.display()))?;
158        let file = path
159            .file_name()
160            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
161        (root, PathBuf::from(file))
162    };
163    let safe = normalize_under_root(&root, &candidate)?;
164    Ok((root, safe))
165}
166
167static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
168
169#[derive(Debug, Clone, Serialize, Deserialize)]
170pub struct FlowDescriptor {
171    pub id: String,
172    #[serde(rename = "type")]
173    pub flow_type: String,
174    pub profile: String,
175    pub version: String,
176    #[serde(default)]
177    pub description: Option<String>,
178}
179
180pub struct HostState {
181    config: Arc<HostConfig>,
182    http_client: Arc<BlockingClient>,
183    default_env: String,
184    #[allow(dead_code)]
185    session_store: Option<DynSessionStore>,
186    state_store: Option<DynStateStore>,
187    mocks: Option<Arc<MockLayer>>,
188    secrets: DynSecretsManager,
189    oauth_config: Option<OAuthBrokerConfig>,
190    oauth_host: OAuthBrokerHost,
191}
192
193impl HostState {
194    #[allow(clippy::default_constructed_unit_structs)]
195    pub fn new(
196        config: Arc<HostConfig>,
197        http_client: Arc<BlockingClient>,
198        mocks: Option<Arc<MockLayer>>,
199        session_store: Option<DynSessionStore>,
200        state_store: Option<DynStateStore>,
201        secrets: DynSecretsManager,
202        oauth_config: Option<OAuthBrokerConfig>,
203    ) -> Result<Self> {
204        let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
205        Ok(Self {
206            config,
207            http_client,
208            default_env,
209            session_store,
210            state_store,
211            mocks,
212            secrets,
213            oauth_config,
214            oauth_host: OAuthBrokerHost::default(),
215        })
216    }
217
218    pub fn get_secret(&self, key: &str) -> Result<String> {
219        if provider_core_only::is_enabled() {
220            bail!(provider_core_only::blocked_message("secrets"))
221        }
222        if !self.config.secrets_policy.is_allowed(key) {
223            bail!("secret {key} is not permitted by bindings policy");
224        }
225        if let Some(mock) = &self.mocks
226            && let Some(value) = mock.secrets_lookup(key)
227        {
228            return Ok(value);
229        }
230        let bytes = read_secret_blocking(&self.secrets, key)
231            .context("failed to read secret from manager")?;
232        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
233        Ok(value)
234    }
235
236    fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
237        let tenant_raw = ctx
238            .as_ref()
239            .map(|ctx| ctx.tenant.clone())
240            .unwrap_or_else(|| self.config.tenant.clone());
241        let env_raw = ctx
242            .as_ref()
243            .map(|ctx| ctx.env.clone())
244            .unwrap_or_else(|| self.default_env.clone());
245        let tenant_id = TenantId::from_str(&tenant_raw)
246            .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
247        let env_id = EnvId::from_str(&env_raw)
248            .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
249        let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
250        if let Some(ctx) = ctx {
251            if let Some(team) = ctx.team.or(ctx.team_id) {
252                let team_id =
253                    TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
254                tenant_ctx = tenant_ctx.with_team(Some(team_id));
255            }
256            if let Some(user) = ctx.user.or(ctx.user_id) {
257                let user_id =
258                    UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
259                tenant_ctx = tenant_ctx.with_user(Some(user_id));
260            }
261            if let Some(flow) = ctx.flow_id {
262                tenant_ctx = tenant_ctx.with_flow(flow);
263            }
264            if let Some(node) = ctx.node_id {
265                tenant_ctx = tenant_ctx.with_node(node);
266            }
267            if let Some(provider) = ctx.provider_id {
268                tenant_ctx = tenant_ctx.with_provider(provider);
269            }
270            if let Some(session) = ctx.session_id {
271                tenant_ctx = tenant_ctx.with_session(session);
272            }
273            tenant_ctx.trace_id = ctx.trace_id;
274        }
275        Ok(tenant_ctx)
276    }
277
278    fn send_http_request(
279        &mut self,
280        req: HttpRequest,
281        opts: Option<HttpRequestOptionsV1_1>,
282        _ctx: Option<HttpTenantCtx>,
283    ) -> Result<HttpResponse, HttpClientError> {
284        if !self.config.http_enabled {
285            return Err(HttpClientError {
286                code: "denied".into(),
287                message: "http client disabled by policy".into(),
288            });
289        }
290
291        let mut mock_state = None;
292        let raw_body = req.body.clone();
293        if let Some(mock) = &self.mocks
294            && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
295        {
296            match mock.http_begin(&meta) {
297                HttpDecision::Mock(response) => {
298                    let headers = response
299                        .headers
300                        .iter()
301                        .map(|(k, v)| (k.clone(), v.clone()))
302                        .collect();
303                    return Ok(HttpResponse {
304                        status: response.status,
305                        headers,
306                        body: response.body.clone().map(|b| b.into_bytes()),
307                    });
308                }
309                HttpDecision::Deny(reason) => {
310                    return Err(HttpClientError {
311                        code: "denied".into(),
312                        message: reason,
313                    });
314                }
315                HttpDecision::Passthrough { record } => {
316                    mock_state = Some((meta, record));
317                }
318            }
319        }
320
321        let method = req.method.parse().unwrap_or(reqwest::Method::GET);
322        let mut builder = self.http_client.request(method, &req.url);
323        for (key, value) in req.headers {
324            if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
325                && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
326            {
327                builder = builder.header(header, header_value);
328            }
329        }
330
331        if let Some(body) = raw_body.clone() {
332            builder = builder.body(body);
333        }
334
335        if let Some(opts) = opts {
336            if let Some(timeout_ms) = opts.timeout_ms {
337                builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
338            }
339            if opts.allow_insecure == Some(true) {
340                warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
341            }
342            if let Some(follow_redirects) = opts.follow_redirects
343                && !follow_redirects
344            {
345                warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
346            }
347        }
348
349        let response = match builder.send() {
350            Ok(resp) => resp,
351            Err(err) => {
352                warn!(url = %req.url, error = %err, "http client request failed");
353                return Err(HttpClientError {
354                    code: "unavailable".into(),
355                    message: err.to_string(),
356                });
357            }
358        };
359
360        let status = response.status().as_u16();
361        let headers_vec = response
362            .headers()
363            .iter()
364            .map(|(k, v)| {
365                (
366                    k.as_str().to_string(),
367                    v.to_str().unwrap_or_default().to_string(),
368                )
369            })
370            .collect::<Vec<_>>();
371        let body_bytes = response.bytes().ok().map(|b| b.to_vec());
372
373        if let Some((meta, true)) = mock_state.take()
374            && let Some(mock) = &self.mocks
375        {
376            let recorded = HttpMockResponse::new(
377                status,
378                headers_vec.clone().into_iter().collect(),
379                body_bytes
380                    .as_ref()
381                    .map(|b| String::from_utf8_lossy(b).into_owned()),
382            );
383            mock.http_record(&meta, &recorded);
384        }
385
386        Ok(HttpResponse {
387            status,
388            headers: headers_vec,
389            body: body_bytes,
390        })
391    }
392}
393
394impl SecretsStoreHost for HostState {
395    fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
396        if provider_core_only::is_enabled() {
397            warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
398            return Err(SecretsError::Denied);
399        }
400        if !self.config.secrets_policy.is_allowed(&key) {
401            return Err(SecretsError::Denied);
402        }
403        if let Some(mock) = &self.mocks
404            && let Some(value) = mock.secrets_lookup(&key)
405        {
406            return Ok(Some(value.into_bytes()));
407        }
408        match read_secret_blocking(&self.secrets, &key) {
409            Ok(bytes) => Ok(Some(bytes)),
410            Err(err) => {
411                warn!(secret = %key, error = %err, "secret lookup failed");
412                Err(SecretsError::NotFound)
413            }
414        }
415    }
416}
417
418impl HttpClientHost for HostState {
419    fn send(
420        &mut self,
421        req: HttpRequest,
422        ctx: Option<HttpTenantCtx>,
423    ) -> Result<HttpResponse, HttpClientError> {
424        self.send_http_request(req, None, ctx)
425    }
426}
427
428impl HttpClientHostV1_1 for HostState {
429    fn send(
430        &mut self,
431        req: HttpRequestV1_1,
432        opts: Option<HttpRequestOptionsV1_1>,
433        ctx: Option<HttpTenantCtxV1_1>,
434    ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
435        let legacy_req = HttpRequest {
436            method: req.method,
437            url: req.url,
438            headers: req.headers,
439            body: req.body,
440        };
441        let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
442            env: ctx.env,
443            tenant: ctx.tenant,
444            tenant_id: ctx.tenant_id,
445            team: ctx.team,
446            team_id: ctx.team_id,
447            user: ctx.user,
448            user_id: ctx.user_id,
449            trace_id: ctx.trace_id,
450            correlation_id: ctx.correlation_id,
451            attributes: ctx.attributes,
452            session_id: ctx.session_id,
453            flow_id: ctx.flow_id,
454            node_id: ctx.node_id,
455            provider_id: ctx.provider_id,
456            deadline_ms: ctx.deadline_ms,
457            attempt: ctx.attempt,
458            idempotency_key: ctx.idempotency_key,
459            impersonation: ctx
460                .impersonation
461                .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
462                    actor_id,
463                    reason,
464                }),
465        });
466
467        self.send_http_request(legacy_req, opts, legacy_ctx)
468            .map(|resp| HttpResponseV1_1 {
469                status: resp.status,
470                headers: resp.headers,
471                body: resp.body,
472            })
473            .map_err(|err| HttpClientErrorV1_1 {
474                code: err.code,
475                message: err.message,
476            })
477    }
478}
479
480impl StateStoreHost for HostState {
481    fn read(
482        &mut self,
483        key: HostStateKey,
484        ctx: Option<StateTenantCtx>,
485    ) -> Result<Vec<u8>, StateError> {
486        let store = match self.state_store.as_ref() {
487            Some(store) => store.clone(),
488            None => {
489                return Err(StateError {
490                    code: "unavailable".into(),
491                    message: "state store not configured".into(),
492                });
493            }
494        };
495        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
496            Ok(ctx) => ctx,
497            Err(err) => {
498                return Err(StateError {
499                    code: "invalid-ctx".into(),
500                    message: err.to_string(),
501                });
502            }
503        };
504        let key = StoreStateKey::from(key);
505        match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
506            Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
507            Ok(None) => Err(StateError {
508                code: "not_found".into(),
509                message: "state key not found".into(),
510            }),
511            Err(err) => Err(StateError {
512                code: "internal".into(),
513                message: err.to_string(),
514            }),
515        }
516    }
517
518    fn write(
519        &mut self,
520        key: HostStateKey,
521        bytes: Vec<u8>,
522        ctx: Option<StateTenantCtx>,
523    ) -> Result<StateOpAck, StateError> {
524        let store = match self.state_store.as_ref() {
525            Some(store) => store.clone(),
526            None => {
527                return Err(StateError {
528                    code: "unavailable".into(),
529                    message: "state store not configured".into(),
530                });
531            }
532        };
533        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
534            Ok(ctx) => ctx,
535            Err(err) => {
536                return Err(StateError {
537                    code: "invalid-ctx".into(),
538                    message: err.to_string(),
539                });
540            }
541        };
542        let key = StoreStateKey::from(key);
543        let value = serde_json::from_slice(&bytes)
544            .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
545        match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
546            Ok(()) => Ok(StateOpAck::Ok),
547            Err(err) => Err(StateError {
548                code: "internal".into(),
549                message: err.to_string(),
550            }),
551        }
552    }
553
554    fn delete(
555        &mut self,
556        key: HostStateKey,
557        ctx: Option<StateTenantCtx>,
558    ) -> Result<StateOpAck, StateError> {
559        let store = match self.state_store.as_ref() {
560            Some(store) => store.clone(),
561            None => {
562                return Err(StateError {
563                    code: "unavailable".into(),
564                    message: "state store not configured".into(),
565                });
566            }
567        };
568        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
569            Ok(ctx) => ctx,
570            Err(err) => {
571                return Err(StateError {
572                    code: "invalid-ctx".into(),
573                    message: err.to_string(),
574                });
575            }
576        };
577        let key = StoreStateKey::from(key);
578        match store.del(&tenant_ctx, STATE_PREFIX, &key) {
579            Ok(_) => Ok(StateOpAck::Ok),
580            Err(err) => Err(StateError {
581                code: "internal".into(),
582                message: err.to_string(),
583            }),
584        }
585    }
586}
587
588impl TelemetryLoggerHost for HostState {
589    fn log(
590        &mut self,
591        span: TelemetrySpanContext,
592        fields: Vec<(String, String)>,
593        _ctx: Option<TelemetryTenantCtx>,
594    ) -> Result<TelemetryAck, TelemetryError> {
595        if let Some(mock) = &self.mocks
596            && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
597        {
598            return Ok(TelemetryAck::Ok);
599        }
600        let mut map = serde_json::Map::new();
601        for (k, v) in fields {
602            map.insert(k, Value::String(v));
603        }
604        tracing::info!(
605            tenant = %span.tenant,
606            flow_id = %span.flow_id,
607            node = ?span.node_id,
608            provider = %span.provider,
609            fields = %serde_json::Value::Object(map.clone()),
610            "telemetry log from pack"
611        );
612        Ok(TelemetryAck::Ok)
613    }
614}
615
616impl RunnerHostHttp for HostState {
617    fn request(
618        &mut self,
619        method: String,
620        url: String,
621        headers: Vec<String>,
622        body: Option<Vec<u8>>,
623    ) -> Result<Vec<u8>, String> {
624        let req = HttpRequest {
625            method,
626            url,
627            headers: headers
628                .chunks(2)
629                .filter_map(|chunk| {
630                    if chunk.len() == 2 {
631                        Some((chunk[0].clone(), chunk[1].clone()))
632                    } else {
633                        None
634                    }
635                })
636                .collect(),
637            body,
638        };
639        match HttpClientHost::send(self, req, None) {
640            Ok(resp) => Ok(resp.body.unwrap_or_default()),
641            Err(err) => Err(err.message),
642        }
643    }
644}
645
646impl RunnerHostKv for HostState {
647    fn get(&mut self, _ns: String, _key: String) -> Option<String> {
648        None
649    }
650
651    fn put(&mut self, _ns: String, _key: String, _val: String) {}
652}
653
654enum ManifestLoad {
655    New {
656        manifest: Box<greentic_types::PackManifest>,
657        flows: PackFlows,
658    },
659    Legacy {
660        manifest: Box<legacy_pack::PackManifest>,
661        flows: PackFlows,
662    },
663}
664
665fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
666    let mut archive = ZipArchive::new(File::open(path)?)
667        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
668    let bytes = read_entry(&mut archive, "manifest.cbor")
669        .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
670    match decode_pack_manifest(&bytes) {
671        Ok(manifest) => {
672            let cache = PackFlows::from_manifest(manifest.clone());
673            Ok(ManifestLoad::New {
674                manifest: Box::new(manifest),
675                flows: cache,
676            })
677        }
678        Err(err) => {
679            tracing::debug!(error = %err, pack = %path.display(), "decode_pack_manifest failed; trying legacy manifest");
680            // Fall back to legacy pack manifest
681            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
682                .context("failed to decode legacy pack manifest from manifest.cbor")?;
683            let flows = load_legacy_flows(&mut archive, &legacy)?;
684            Ok(ManifestLoad::Legacy {
685                manifest: Box::new(legacy),
686                flows,
687            })
688        }
689    }
690}
691
692fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
693    let manifest_path = root.join("manifest.cbor");
694    let bytes = std::fs::read(&manifest_path)
695        .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
696    match decode_pack_manifest(&bytes) {
697        Ok(manifest) => {
698            let cache = PackFlows::from_manifest(manifest.clone());
699            Ok(ManifestLoad::New {
700                manifest: Box::new(manifest),
701                flows: cache,
702            })
703        }
704        Err(err) => {
705            tracing::debug!(
706                error = %err,
707                pack = %root.display(),
708                "decode_pack_manifest failed for materialized pack; trying legacy manifest"
709            );
710            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
711                .context("failed to decode legacy pack manifest from manifest.cbor")?;
712            let flows = load_legacy_flows_from_dir(root, &legacy)?;
713            Ok(ManifestLoad::Legacy {
714                manifest: Box::new(legacy),
715                flows,
716            })
717        }
718    }
719}
720
721fn load_legacy_flows(
722    archive: &mut ZipArchive<File>,
723    manifest: &legacy_pack::PackManifest,
724) -> Result<PackFlows> {
725    let mut flows = HashMap::new();
726    let mut descriptors = Vec::new();
727
728    for entry in &manifest.flows {
729        let bytes = read_entry(archive, &entry.file_json)
730            .with_context(|| format!("missing flow json {}", entry.file_json))?;
731        let doc: FlowDoc = serde_json::from_slice(&bytes)
732            .with_context(|| format!("failed to decode flow doc {}", entry.file_json))?;
733        let normalized = normalize_flow_doc(doc);
734        let flow_ir = flow_doc_to_ir(normalized)?;
735        let flow = flow_ir_to_flow(flow_ir)?;
736
737        descriptors.push(FlowDescriptor {
738            id: entry.id.clone(),
739            flow_type: entry.kind.clone(),
740            profile: manifest.meta.pack_id.clone(),
741            version: manifest.meta.version.to_string(),
742            description: None,
743        });
744        flows.insert(entry.id.clone(), flow);
745    }
746
747    let mut entry_flows = manifest.meta.entry_flows.clone();
748    if entry_flows.is_empty() {
749        entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
750    }
751    let metadata = PackMetadata {
752        pack_id: manifest.meta.pack_id.clone(),
753        version: manifest.meta.version.to_string(),
754        entry_flows,
755        secret_requirements: Vec::new(),
756    };
757
758    Ok(PackFlows {
759        descriptors,
760        flows,
761        metadata,
762    })
763}
764
765fn load_legacy_flows_from_dir(
766    root: &Path,
767    manifest: &legacy_pack::PackManifest,
768) -> Result<PackFlows> {
769    let mut flows = HashMap::new();
770    let mut descriptors = Vec::new();
771
772    for entry in &manifest.flows {
773        let path = root.join(&entry.file_json);
774        let bytes = std::fs::read(&path)
775            .with_context(|| format!("missing flow json {}", path.display()))?;
776        let doc: FlowDoc = serde_json::from_slice(&bytes)
777            .with_context(|| format!("failed to decode flow doc {}", path.display()))?;
778        let normalized = normalize_flow_doc(doc);
779        let flow_ir = flow_doc_to_ir(normalized)?;
780        let flow = flow_ir_to_flow(flow_ir)?;
781
782        descriptors.push(FlowDescriptor {
783            id: entry.id.clone(),
784            flow_type: entry.kind.clone(),
785            profile: manifest.meta.pack_id.clone(),
786            version: manifest.meta.version.to_string(),
787            description: None,
788        });
789        flows.insert(entry.id.clone(), flow);
790    }
791
792    let mut entry_flows = manifest.meta.entry_flows.clone();
793    if entry_flows.is_empty() {
794        entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
795    }
796    let metadata = PackMetadata {
797        pack_id: manifest.meta.pack_id.clone(),
798        version: manifest.meta.version.to_string(),
799        entry_flows,
800        secret_requirements: Vec::new(),
801    };
802
803    Ok(PackFlows {
804        descriptors,
805        flows,
806        metadata,
807    })
808}
809
810pub struct ComponentState {
811    pub host: HostState,
812    wasi_ctx: WasiCtx,
813    resource_table: ResourceTable,
814}
815
816impl ComponentState {
817    pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
818        let wasi_ctx = policy
819            .instantiate()
820            .context("failed to build WASI context")?;
821        Ok(Self {
822            host,
823            wasi_ctx,
824            resource_table: ResourceTable::new(),
825        })
826    }
827
828    fn host_mut(&mut self) -> &mut HostState {
829        &mut self.host
830    }
831
832    fn should_cancel_host(&mut self) -> bool {
833        false
834    }
835
836    fn yield_now_host(&mut self) {
837        // no-op cooperative yield
838    }
839}
840
841impl component_api::v0_4::greentic::component::control::Host for ComponentState {
842    fn should_cancel(&mut self) -> bool {
843        self.should_cancel_host()
844    }
845
846    fn yield_now(&mut self) {
847        self.yield_now_host();
848    }
849}
850
851impl component_api::v0_5::greentic::component::control::Host for ComponentState {
852    fn should_cancel(&mut self) -> bool {
853        self.should_cancel_host()
854    }
855
856    fn yield_now(&mut self) {
857        self.yield_now_host();
858    }
859}
860
861fn add_component_control_instance(
862    linker: &mut Linker<ComponentState>,
863    name: &str,
864) -> wasmtime::Result<()> {
865    let mut inst = linker.instance(name)?;
866    inst.func_wrap(
867        "should-cancel",
868        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
869            let host = caller.data_mut();
870            Ok((host.should_cancel_host(),))
871        },
872    )?;
873    inst.func_wrap(
874        "yield-now",
875        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
876            let host = caller.data_mut();
877            host.yield_now_host();
878            Ok(())
879        },
880    )?;
881    Ok(())
882}
883
884fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
885    add_component_control_instance(linker, "greentic:component/control@0.5.0")?;
886    add_component_control_instance(linker, "greentic:component/control@0.4.0")?;
887    Ok(())
888}
889
890pub fn register_all(linker: &mut Linker<ComponentState>) -> Result<()> {
891    add_wasi_to_linker(linker)?;
892    add_all_v1_to_linker(
893        linker,
894        HostFns {
895            http_client_v1_1: Some(|state| state.host_mut()),
896            http_client: Some(|state| state.host_mut()),
897            oauth_broker: None,
898            runner_host_http: Some(|state| state.host_mut()),
899            runner_host_kv: Some(|state| state.host_mut()),
900            telemetry_logger: Some(|state| state.host_mut()),
901            state_store: Some(|state| state.host_mut()),
902            secrets_store: Some(|state| state.host_mut()),
903        },
904    )?;
905    Ok(())
906}
907
908impl OAuthHostContext for ComponentState {
909    fn tenant_id(&self) -> &str {
910        &self.host.config.tenant
911    }
912
913    fn env(&self) -> &str {
914        &self.host.default_env
915    }
916
917    fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
918        &mut self.host.oauth_host
919    }
920
921    fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
922        self.host.oauth_config.as_ref()
923    }
924}
925
926impl WasiView for ComponentState {
927    fn ctx(&mut self) -> WasiCtxView<'_> {
928        WasiCtxView {
929            ctx: &mut self.wasi_ctx,
930            table: &mut self.resource_table,
931        }
932    }
933}
934
935#[allow(unsafe_code)]
936unsafe impl Send for ComponentState {}
937#[allow(unsafe_code)]
938unsafe impl Sync for ComponentState {}
939
940impl PackRuntime {
941    #[allow(clippy::too_many_arguments)]
942    pub async fn load(
943        path: impl AsRef<Path>,
944        config: Arc<HostConfig>,
945        mocks: Option<Arc<MockLayer>>,
946        archive_source: Option<&Path>,
947        session_store: Option<DynSessionStore>,
948        state_store: Option<DynStateStore>,
949        wasi_policy: Arc<RunnerWasiPolicy>,
950        secrets: DynSecretsManager,
951        oauth_config: Option<OAuthBrokerConfig>,
952        verify_archive: bool,
953        component_resolution: ComponentResolution,
954    ) -> Result<Self> {
955        let path = path.as_ref();
956        let (_pack_root, safe_path) = normalize_pack_path(path)?;
957        let path_meta = std::fs::metadata(&safe_path).ok();
958        let is_dir = path_meta
959            .as_ref()
960            .map(|meta| meta.is_dir())
961            .unwrap_or(false);
962        let is_component = !is_dir
963            && safe_path
964                .extension()
965                .and_then(|ext| ext.to_str())
966                .map(|ext| ext.eq_ignore_ascii_case("wasm"))
967                .unwrap_or(false);
968        let archive_hint_path = if let Some(source) = archive_source {
969            let (_, normalized) = normalize_pack_path(source)?;
970            Some(normalized)
971        } else if is_component || is_dir {
972            None
973        } else {
974            Some(safe_path.clone())
975        };
976        let archive_hint = archive_hint_path.as_deref();
977        if verify_archive {
978            if let Some(verify_target) = archive_hint.and_then(|p| {
979                std::fs::metadata(p)
980                    .ok()
981                    .filter(|meta| meta.is_file())
982                    .map(|_| p)
983            }) {
984                verify::verify_pack(verify_target).await?;
985                tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
986            } else {
987                tracing::debug!("skipping archive verification (no archive source)");
988            }
989        }
990        let engine = Engine::default();
991        let mut metadata = PackMetadata::fallback(&safe_path);
992        let mut manifest = None;
993        let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
994        let mut flows = None;
995        let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
996            if is_dir {
997                Some(safe_path.clone())
998            } else {
999                None
1000            }
1001        });
1002
1003        if let Some(root) = materialized_root.as_ref() {
1004            match load_manifest_and_flows_from_dir(root) {
1005                Ok(ManifestLoad::New {
1006                    manifest: m,
1007                    flows: cache,
1008                }) => {
1009                    metadata = cache.metadata.clone();
1010                    manifest = Some(*m);
1011                    flows = Some(cache);
1012                }
1013                Ok(ManifestLoad::Legacy {
1014                    manifest: m,
1015                    flows: cache,
1016                }) => {
1017                    metadata = cache.metadata.clone();
1018                    legacy_manifest = Some(m);
1019                    flows = Some(cache);
1020                }
1021                Err(err) => {
1022                    warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
1023                }
1024            }
1025        }
1026
1027        if manifest.is_none()
1028            && legacy_manifest.is_none()
1029            && let Some(archive_path) = archive_hint
1030        {
1031            match load_manifest_and_flows(archive_path) {
1032                Ok(ManifestLoad::New {
1033                    manifest: m,
1034                    flows: cache,
1035                }) => {
1036                    metadata = cache.metadata.clone();
1037                    manifest = Some(*m);
1038                    flows = Some(cache);
1039                }
1040                Ok(ManifestLoad::Legacy {
1041                    manifest: m,
1042                    flows: cache,
1043                }) => {
1044                    metadata = cache.metadata.clone();
1045                    legacy_manifest = Some(m);
1046                    flows = Some(cache);
1047                }
1048                Err(err) => {
1049                    warn!(error = %err, pack = %archive_path.display(), "failed to parse pack manifest; skipping flows");
1050                }
1051            }
1052        }
1053        let component_sources = if let Some(manifest) = manifest.as_ref() {
1054            component_sources_table(manifest).context("invalid component sources extension")?
1055        } else {
1056            None
1057        };
1058        let components = if is_component {
1059            let wasm_bytes = fs::read(&safe_path).await?;
1060            metadata = PackMetadata::from_wasm(&wasm_bytes)
1061                .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1062            let name = safe_path
1063                .file_stem()
1064                .map(|s| s.to_string_lossy().to_string())
1065                .unwrap_or_else(|| "component".to_string());
1066            let component = Component::from_binary(&engine, &wasm_bytes)?;
1067            let mut map = HashMap::new();
1068            map.insert(
1069                name.clone(),
1070                PackComponent {
1071                    name,
1072                    version: metadata.version.clone(),
1073                    component,
1074                },
1075            );
1076            map
1077        } else {
1078            let specs = component_specs(manifest.as_ref(), legacy_manifest.as_deref());
1079            if specs.is_empty() {
1080                HashMap::new()
1081            } else {
1082                let mut loaded = HashMap::new();
1083                let mut missing: HashSet<String> =
1084                    specs.iter().map(|spec| spec.id.clone()).collect();
1085                let mut searched = Vec::new();
1086
1087                if !component_resolution.overrides.is_empty() {
1088                    load_components_from_overrides(
1089                        &engine,
1090                        &component_resolution.overrides,
1091                        &specs,
1092                        &mut missing,
1093                        &mut loaded,
1094                    )?;
1095                    searched.push("override map".to_string());
1096                }
1097
1098                if let Some(component_sources) = component_sources.as_ref() {
1099                    load_components_from_sources(
1100                        &engine,
1101                        component_sources,
1102                        &component_resolution,
1103                        &specs,
1104                        &mut missing,
1105                        &mut loaded,
1106                        materialized_root.as_deref(),
1107                        archive_hint,
1108                    )
1109                    .await?;
1110                    searched.push(format!("extension {}", EXT_COMPONENT_SOURCES_V1));
1111                }
1112
1113                if let Some(root) = materialized_root.as_ref() {
1114                    load_components_from_dir(&engine, root, &specs, &mut missing, &mut loaded)?;
1115                    searched.push(format!("components dir {}", root.display()));
1116                }
1117
1118                if let Some(archive_path) = archive_hint {
1119                    load_components_from_archive(
1120                        &engine,
1121                        archive_path,
1122                        &specs,
1123                        &mut missing,
1124                        &mut loaded,
1125                    )?;
1126                    searched.push(format!("archive {}", archive_path.display()));
1127                }
1128
1129                if !missing.is_empty() {
1130                    let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
1131                    let sources = if searched.is_empty() {
1132                        "no component sources".to_string()
1133                    } else {
1134                        searched.join(", ")
1135                    };
1136                    bail!(
1137                        "components missing: {}; looked in {}",
1138                        missing_list,
1139                        sources
1140                    );
1141                }
1142
1143                loaded
1144            }
1145        };
1146        let http_client = Arc::clone(&HTTP_CLIENT);
1147        Ok(Self {
1148            path: safe_path,
1149            archive_path: archive_hint.map(Path::to_path_buf),
1150            config,
1151            engine,
1152            metadata,
1153            manifest,
1154            legacy_manifest,
1155            mocks,
1156            flows,
1157            components,
1158            http_client,
1159            pre_cache: Mutex::new(HashMap::new()),
1160            session_store,
1161            state_store,
1162            wasi_policy,
1163            provider_registry: RwLock::new(None),
1164            secrets,
1165            oauth_config,
1166        })
1167    }
1168
1169    pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
1170        if let Some(cache) = &self.flows {
1171            return Ok(cache.descriptors.clone());
1172        }
1173        if let Some(manifest) = &self.manifest {
1174            let descriptors = manifest
1175                .flows
1176                .iter()
1177                .map(|flow| FlowDescriptor {
1178                    id: flow.id.as_str().to_string(),
1179                    flow_type: flow_kind_to_str(flow.kind).to_string(),
1180                    profile: manifest.pack_id.as_str().to_string(),
1181                    version: manifest.version.to_string(),
1182                    description: None,
1183                })
1184                .collect();
1185            return Ok(descriptors);
1186        }
1187        Ok(Vec::new())
1188    }
1189
1190    #[allow(dead_code)]
1191    pub async fn run_flow(
1192        &self,
1193        flow_id: &str,
1194        input: serde_json::Value,
1195    ) -> Result<serde_json::Value> {
1196        let pack = Arc::new(
1197            PackRuntime::load(
1198                &self.path,
1199                Arc::clone(&self.config),
1200                self.mocks.clone(),
1201                self.archive_path.as_deref(),
1202                self.session_store.clone(),
1203                self.state_store.clone(),
1204                Arc::clone(&self.wasi_policy),
1205                self.secrets.clone(),
1206                self.oauth_config.clone(),
1207                false,
1208                ComponentResolution::default(),
1209            )
1210            .await?,
1211        );
1212
1213        let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1214        let retry_config = self.config.retry_config().into();
1215        let mocks = pack.mocks.as_deref();
1216        let tenant = self.config.tenant.as_str();
1217
1218        let ctx = FlowContext {
1219            tenant,
1220            flow_id,
1221            node_id: None,
1222            tool: None,
1223            action: None,
1224            session_id: None,
1225            provider_id: None,
1226            retry_config,
1227            observer: None,
1228            mocks,
1229        };
1230
1231        let execution = engine.execute(ctx, input).await?;
1232        match execution.status {
1233            FlowStatus::Completed => Ok(execution.output),
1234            FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1235                "status": "pending",
1236                "reason": wait.reason,
1237                "resume": wait.snapshot,
1238                "response": execution.output,
1239            })),
1240        }
1241    }
1242
1243    pub async fn invoke_component(
1244        &self,
1245        component_ref: &str,
1246        ctx: ComponentExecCtx,
1247        operation: &str,
1248        _config_json: Option<String>,
1249        input_json: String,
1250    ) -> Result<Value> {
1251        let pack_component = self
1252            .components
1253            .get(component_ref)
1254            .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1255
1256        let mut linker = Linker::new(&self.engine);
1257        register_all(&mut linker)?;
1258        add_component_control_to_linker(&mut linker)?;
1259
1260        let host_state = HostState::new(
1261            Arc::clone(&self.config),
1262            Arc::clone(&self.http_client),
1263            self.mocks.clone(),
1264            self.session_store.clone(),
1265            self.state_store.clone(),
1266            Arc::clone(&self.secrets),
1267            self.oauth_config.clone(),
1268        )?;
1269        let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1270        let mut store = wasmtime::Store::new(&self.engine, store_state);
1271        let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1272        let result = match component_api::v0_5::ComponentPre::new(pre_instance) {
1273            Ok(pre) => {
1274                let bindings = pre.instantiate_async(&mut store).await?;
1275                let node = bindings.greentic_component_node();
1276                let ctx_v05 = component_api::exec_ctx_v0_5(&ctx);
1277                let result = node.call_invoke(&mut store, &ctx_v05, operation, &input_json)?;
1278                component_api::invoke_result_from_v0_5(result)
1279            }
1280            Err(err) => {
1281                if is_missing_node_export(&err, "0.5.0") {
1282                    let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1283                    let pre: component_api::v0_4::ComponentPre<ComponentState> =
1284                        component_api::v0_4::ComponentPre::new(pre_instance)?;
1285                    let bindings = pre.instantiate_async(&mut store).await?;
1286                    let node = bindings.greentic_component_node();
1287                    let ctx_v04 = component_api::exec_ctx_v0_4(&ctx);
1288                    let result = node.call_invoke(&mut store, &ctx_v04, operation, &input_json)?;
1289                    component_api::invoke_result_from_v0_4(result)
1290                } else {
1291                    return Err(err);
1292                }
1293            }
1294        };
1295
1296        match result {
1297            InvokeResult::Ok(body) => {
1298                if body.is_empty() {
1299                    return Ok(Value::Null);
1300                }
1301                serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
1302            }
1303            InvokeResult::Err(NodeError {
1304                code,
1305                message,
1306                retryable,
1307                backoff_ms,
1308                details,
1309            }) => {
1310                let mut obj = serde_json::Map::new();
1311                obj.insert("ok".into(), Value::Bool(false));
1312                let mut error = serde_json::Map::new();
1313                error.insert("code".into(), Value::String(code));
1314                error.insert("message".into(), Value::String(message));
1315                error.insert("retryable".into(), Value::Bool(retryable));
1316                if let Some(backoff) = backoff_ms {
1317                    error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1318                }
1319                if let Some(details) = details {
1320                    error.insert(
1321                        "details".into(),
1322                        serde_json::from_str(&details).unwrap_or(Value::String(details)),
1323                    );
1324                }
1325                obj.insert("error".into(), Value::Object(error));
1326                Ok(Value::Object(obj))
1327            }
1328        }
1329    }
1330
1331    pub fn resolve_provider(
1332        &self,
1333        provider_id: Option<&str>,
1334        provider_type: Option<&str>,
1335    ) -> Result<ProviderBinding> {
1336        let registry = self.provider_registry()?;
1337        registry.resolve(provider_id, provider_type)
1338    }
1339
1340    pub async fn invoke_provider(
1341        &self,
1342        binding: &ProviderBinding,
1343        _ctx: ComponentExecCtx,
1344        op: &str,
1345        input_json: Vec<u8>,
1346    ) -> Result<Value> {
1347        let component_ref = &binding.component_ref;
1348        let pack_component = self
1349            .components
1350            .get(component_ref)
1351            .with_context(|| format!("provider component '{component_ref}' not found in pack"))?;
1352
1353        let mut linker = Linker::new(&self.engine);
1354        register_all(&mut linker)?;
1355        add_component_control_to_linker(&mut linker)?;
1356        let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1357        let pre: ProviderComponentPre<ComponentState> = ProviderComponentPre::new(pre_instance)?;
1358
1359        let host_state = HostState::new(
1360            Arc::clone(&self.config),
1361            Arc::clone(&self.http_client),
1362            self.mocks.clone(),
1363            self.session_store.clone(),
1364            self.state_store.clone(),
1365            Arc::clone(&self.secrets),
1366            self.oauth_config.clone(),
1367        )?;
1368        let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1369        let mut store = wasmtime::Store::new(&self.engine, store_state);
1370        let bindings: crate::provider_core::SchemaCore = pre.instantiate_async(&mut store).await?;
1371        let provider = bindings.greentic_provider_core_schema_core_api();
1372
1373        let result = provider.call_invoke(&mut store, op, &input_json)?;
1374        deserialize_json_bytes(result)
1375    }
1376
1377    fn provider_registry(&self) -> Result<ProviderRegistry> {
1378        if let Some(registry) = self.provider_registry.read().clone() {
1379            return Ok(registry);
1380        }
1381        let manifest = self
1382            .manifest
1383            .as_ref()
1384            .context("pack manifest required for provider resolution")?;
1385        let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1386        let registry = ProviderRegistry::new(
1387            manifest,
1388            self.state_store.clone(),
1389            &self.config.tenant,
1390            &env,
1391        )?;
1392        *self.provider_registry.write() = Some(registry.clone());
1393        Ok(registry)
1394    }
1395
1396    pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1397        if let Some(cache) = &self.flows {
1398            return cache
1399                .flows
1400                .get(flow_id)
1401                .cloned()
1402                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1403        }
1404        if let Some(manifest) = &self.manifest {
1405            let entry = manifest
1406                .flows
1407                .iter()
1408                .find(|f| f.id.as_str() == flow_id)
1409                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1410            return Ok(entry.flow.clone());
1411        }
1412        bail!("flow '{flow_id}' not available (pack exports disabled)")
1413    }
1414
1415    pub fn metadata(&self) -> &PackMetadata {
1416        &self.metadata
1417    }
1418
1419    pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1420        &self.metadata.secret_requirements
1421    }
1422
1423    pub fn missing_secrets(
1424        &self,
1425        tenant_ctx: &TypesTenantCtx,
1426    ) -> Vec<greentic_types::SecretRequirement> {
1427        let env = tenant_ctx.env.as_str().to_string();
1428        let tenant = tenant_ctx.tenant.as_str().to_string();
1429        let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1430        self.required_secrets()
1431            .iter()
1432            .filter(|req| {
1433                // scope must match current context if provided
1434                if let Some(scope) = &req.scope {
1435                    if scope.env != env {
1436                        return false;
1437                    }
1438                    if scope.tenant != tenant {
1439                        return false;
1440                    }
1441                    if let Some(ref team_req) = scope.team
1442                        && team.as_ref() != Some(team_req)
1443                    {
1444                        return false;
1445                    }
1446                }
1447                read_secret_blocking(&self.secrets, req.key.as_str()).is_err()
1448            })
1449            .cloned()
1450            .collect()
1451    }
1452
1453    pub fn for_component_test(
1454        components: Vec<(String, PathBuf)>,
1455        flows: HashMap<String, FlowIR>,
1456        config: Arc<HostConfig>,
1457    ) -> Result<Self> {
1458        let engine = Engine::default();
1459        let mut component_map = HashMap::new();
1460        for (name, path) in components {
1461            if !path.exists() {
1462                bail!("component artifact missing: {}", path.display());
1463            }
1464            let wasm_bytes = std::fs::read(&path)?;
1465            let component = Component::from_binary(&engine, &wasm_bytes)
1466                .with_context(|| format!("failed to compile component {}", path.display()))?;
1467            component_map.insert(
1468                name.clone(),
1469                PackComponent {
1470                    name,
1471                    version: "0.0.0".into(),
1472                    component,
1473                },
1474            );
1475        }
1476
1477        let mut flow_map = HashMap::new();
1478        let mut descriptors = Vec::new();
1479        for (id, ir) in flows {
1480            let flow_type = ir.flow_type.clone();
1481            let flow = flow_ir_to_flow(ir)?;
1482            flow_map.insert(id.clone(), flow);
1483            descriptors.push(FlowDescriptor {
1484                id: id.clone(),
1485                flow_type,
1486                profile: "test".into(),
1487                version: "0.0.0".into(),
1488                description: None,
1489            });
1490        }
1491        let flows_cache = PackFlows {
1492            descriptors: descriptors.clone(),
1493            flows: flow_map,
1494            metadata: PackMetadata::fallback(Path::new("component-test")),
1495        };
1496
1497        Ok(Self {
1498            path: PathBuf::new(),
1499            archive_path: None,
1500            config,
1501            engine,
1502            metadata: PackMetadata::fallback(Path::new("component-test")),
1503            manifest: None,
1504            legacy_manifest: None,
1505            mocks: None,
1506            flows: Some(flows_cache),
1507            components: component_map,
1508            http_client: Arc::clone(&HTTP_CLIENT),
1509            pre_cache: Mutex::new(HashMap::new()),
1510            session_store: None,
1511            state_store: None,
1512            wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1513            provider_registry: RwLock::new(None),
1514            secrets: crate::secrets::default_manager(),
1515            oauth_config: None,
1516        })
1517    }
1518}
1519
1520fn is_missing_node_export(err: &wasmtime::Error, version: &str) -> bool {
1521    let message = err.to_string();
1522    message.contains("no exported instance named")
1523        && message.contains(&format!("greentic:component/node@{version}"))
1524}
1525
1526struct PackFlows {
1527    descriptors: Vec<FlowDescriptor>,
1528    flows: HashMap<String, Flow>,
1529    metadata: PackMetadata,
1530}
1531
1532const RUNTIME_FLOW_EXTENSION_IDS: [&str; 3] = [
1533    "greentic.pack.runtime_flow",
1534    "greentic.pack.flow_runtime",
1535    "greentic.pack.runtime_flows",
1536];
1537
1538#[derive(Debug, Deserialize)]
1539struct RuntimeFlowBundle {
1540    flows: Vec<RuntimeFlow>,
1541}
1542
1543#[derive(Debug, Deserialize)]
1544struct RuntimeFlow {
1545    id: String,
1546    #[serde(alias = "flow_type")]
1547    kind: FlowKind,
1548    #[serde(default)]
1549    schema_version: Option<String>,
1550    #[serde(default)]
1551    start: Option<String>,
1552    #[serde(default)]
1553    entrypoints: BTreeMap<String, Value>,
1554    nodes: BTreeMap<String, RuntimeNode>,
1555    #[serde(default)]
1556    metadata: Option<FlowMetadata>,
1557}
1558
1559#[derive(Debug, Deserialize)]
1560struct RuntimeNode {
1561    #[serde(alias = "component")]
1562    component_id: String,
1563    #[serde(default, alias = "operation")]
1564    operation_name: Option<String>,
1565    #[serde(default, alias = "payload", alias = "input")]
1566    operation_payload: Value,
1567    #[serde(default)]
1568    routing: Option<Routing>,
1569    #[serde(default)]
1570    telemetry: Option<TelemetryHints>,
1571}
1572
1573fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
1574    if bytes.is_empty() {
1575        return Ok(Value::Null);
1576    }
1577    serde_json::from_slice(&bytes).or_else(|_| {
1578        String::from_utf8(bytes)
1579            .map(Value::String)
1580            .map_err(|err| anyhow!(err))
1581    })
1582}
1583
1584impl PackFlows {
1585    fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
1586        if let Some(flows) = flows_from_runtime_extension(&manifest) {
1587            return flows;
1588        }
1589        let descriptors = manifest
1590            .flows
1591            .iter()
1592            .map(|entry| FlowDescriptor {
1593                id: entry.id.as_str().to_string(),
1594                flow_type: flow_kind_to_str(entry.kind).to_string(),
1595                profile: manifest.pack_id.as_str().to_string(),
1596                version: manifest.version.to_string(),
1597                description: None,
1598            })
1599            .collect();
1600        let mut flows = HashMap::new();
1601        for entry in &manifest.flows {
1602            flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
1603        }
1604        Self {
1605            metadata: PackMetadata::from_manifest(&manifest),
1606            descriptors,
1607            flows,
1608        }
1609    }
1610}
1611
1612fn flows_from_runtime_extension(manifest: &greentic_types::PackManifest) -> Option<PackFlows> {
1613    let extensions = manifest.extensions.as_ref()?;
1614    let extension = extensions.iter().find_map(|(key, ext)| {
1615        if RUNTIME_FLOW_EXTENSION_IDS
1616            .iter()
1617            .any(|candidate| candidate == key)
1618        {
1619            Some(ext)
1620        } else {
1621            None
1622        }
1623    })?;
1624    let runtime_flows = match decode_runtime_flow_extension(extension) {
1625        Some(flows) if !flows.is_empty() => flows,
1626        _ => return None,
1627    };
1628
1629    let descriptors = runtime_flows
1630        .iter()
1631        .map(|flow| FlowDescriptor {
1632            id: flow.id.as_str().to_string(),
1633            flow_type: flow_kind_to_str(flow.kind).to_string(),
1634            profile: manifest.pack_id.as_str().to_string(),
1635            version: manifest.version.to_string(),
1636            description: None,
1637        })
1638        .collect::<Vec<_>>();
1639    let flows = runtime_flows
1640        .into_iter()
1641        .map(|flow| (flow.id.as_str().to_string(), flow))
1642        .collect();
1643
1644    Some(PackFlows {
1645        metadata: PackMetadata::from_manifest(manifest),
1646        descriptors,
1647        flows,
1648    })
1649}
1650
1651fn decode_runtime_flow_extension(extension: &ExtensionRef) -> Option<Vec<Flow>> {
1652    let value = match extension.inline.as_ref()? {
1653        ExtensionInline::Other(value) => value.clone(),
1654        _ => return None,
1655    };
1656
1657    if let Ok(bundle) = serde_json::from_value::<RuntimeFlowBundle>(value.clone()) {
1658        return Some(collect_runtime_flows(bundle.flows));
1659    }
1660
1661    if let Ok(flows) = serde_json::from_value::<Vec<RuntimeFlow>>(value.clone()) {
1662        return Some(collect_runtime_flows(flows));
1663    }
1664
1665    if let Ok(flows) = serde_json::from_value::<Vec<Flow>>(value) {
1666        return Some(flows);
1667    }
1668
1669    warn!(
1670        extension = %extension.kind,
1671        version = %extension.version,
1672        "runtime flow extension present but could not be decoded"
1673    );
1674    None
1675}
1676
1677fn collect_runtime_flows(flows: Vec<RuntimeFlow>) -> Vec<Flow> {
1678    flows
1679        .into_iter()
1680        .filter_map(|flow| match runtime_flow_to_flow(flow) {
1681            Ok(flow) => Some(flow),
1682            Err(err) => {
1683                warn!(error = %err, "failed to decode runtime flow");
1684                None
1685            }
1686        })
1687        .collect()
1688}
1689
1690fn runtime_flow_to_flow(runtime: RuntimeFlow) -> Result<Flow> {
1691    let flow_id = FlowId::from_str(&runtime.id)
1692        .with_context(|| format!("invalid flow id `{}`", runtime.id))?;
1693    let mut entrypoints = runtime.entrypoints;
1694    if entrypoints.is_empty()
1695        && let Some(start) = &runtime.start
1696    {
1697        entrypoints.insert("default".into(), Value::String(start.clone()));
1698    }
1699
1700    let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
1701    for (id, node) in runtime.nodes {
1702        let node_id = NodeId::from_str(&id).with_context(|| format!("invalid node id `{id}`"))?;
1703        let component_id = ComponentId::from_str(&node.component_id)
1704            .with_context(|| format!("invalid component id `{}`", node.component_id))?;
1705        let component = FlowComponentRef {
1706            id: component_id,
1707            pack_alias: None,
1708            operation: node.operation_name,
1709        };
1710        let routing = node.routing.unwrap_or(Routing::End);
1711        let telemetry = node.telemetry.unwrap_or_default();
1712        nodes.insert(
1713            node_id.clone(),
1714            Node {
1715                id: node_id,
1716                component,
1717                input: InputMapping {
1718                    mapping: node.operation_payload,
1719                },
1720                output: OutputMapping {
1721                    mapping: Value::Null,
1722                },
1723                routing,
1724                telemetry,
1725            },
1726        );
1727    }
1728
1729    Ok(Flow {
1730        schema_version: runtime.schema_version.unwrap_or_else(|| "1.0".to_string()),
1731        id: flow_id,
1732        kind: runtime.kind,
1733        entrypoints,
1734        nodes,
1735        metadata: runtime.metadata.unwrap_or_default(),
1736    })
1737}
1738
1739fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
1740    match kind {
1741        greentic_types::FlowKind::Messaging => "messaging",
1742        greentic_types::FlowKind::Event => "event",
1743        greentic_types::FlowKind::ComponentConfig => "component-config",
1744        greentic_types::FlowKind::Job => "job",
1745        greentic_types::FlowKind::Http => "http",
1746    }
1747}
1748
1749fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1750    let mut file = archive
1751        .by_name(name)
1752        .with_context(|| format!("entry {name} missing from archive"))?;
1753    let mut buf = Vec::new();
1754    file.read_to_end(&mut buf)?;
1755    Ok(buf)
1756}
1757
1758fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
1759    for node in doc.nodes.values_mut() {
1760        let Some((component_ref, payload)) = node
1761            .raw
1762            .iter()
1763            .next()
1764            .map(|(key, value)| (key.clone(), value.clone()))
1765        else {
1766            continue;
1767        };
1768        if component_ref.starts_with("emit.") {
1769            node.operation = Some(component_ref);
1770            node.payload = payload;
1771            node.raw.clear();
1772            continue;
1773        }
1774        let (target_component, operation, input, config) =
1775            infer_component_exec(&payload, &component_ref);
1776        let mut payload_obj = serde_json::Map::new();
1777        // component.exec is meta; ensure the payload carries the actual target component.
1778        payload_obj.insert("component".into(), Value::String(target_component));
1779        payload_obj.insert("operation".into(), Value::String(operation));
1780        payload_obj.insert("input".into(), input);
1781        if let Some(cfg) = config {
1782            payload_obj.insert("config".into(), cfg);
1783        }
1784        node.operation = Some("component.exec".to_string());
1785        node.payload = Value::Object(payload_obj);
1786        node.raw.clear();
1787    }
1788    doc
1789}
1790
1791fn infer_component_exec(
1792    payload: &Value,
1793    component_ref: &str,
1794) -> (String, String, Value, Option<Value>) {
1795    let default_op = if component_ref.starts_with("templating.") {
1796        "render"
1797    } else {
1798        "invoke"
1799    }
1800    .to_string();
1801
1802    if let Value::Object(map) = payload {
1803        let op = map
1804            .get("op")
1805            .or_else(|| map.get("operation"))
1806            .and_then(Value::as_str)
1807            .map(|s| s.to_string())
1808            .unwrap_or_else(|| default_op.clone());
1809
1810        let mut input = map.clone();
1811        let config = input.remove("config");
1812        let component = input
1813            .get("component")
1814            .or_else(|| input.get("component_ref"))
1815            .and_then(Value::as_str)
1816            .map(|s| s.to_string())
1817            .unwrap_or_else(|| component_ref.to_string());
1818        input.remove("component");
1819        input.remove("component_ref");
1820        input.remove("op");
1821        input.remove("operation");
1822        return (component, op, Value::Object(input), config);
1823    }
1824
1825    (component_ref.to_string(), default_op, payload.clone(), None)
1826}
1827
1828#[derive(Clone, Debug)]
1829struct ComponentSpec {
1830    id: String,
1831    version: String,
1832    legacy_path: Option<String>,
1833}
1834
1835#[derive(Clone, Debug)]
1836struct ComponentSourceInfo {
1837    digest: String,
1838    source: ComponentSourceRef,
1839    artifact: ComponentArtifactLocation,
1840}
1841
1842#[derive(Clone, Debug)]
1843enum ComponentArtifactLocation {
1844    Inline { wasm_path: String },
1845    Remote,
1846}
1847
1848fn component_specs(
1849    manifest: Option<&greentic_types::PackManifest>,
1850    legacy_manifest: Option<&legacy_pack::PackManifest>,
1851) -> Vec<ComponentSpec> {
1852    if let Some(manifest) = manifest {
1853        return manifest
1854            .components
1855            .iter()
1856            .map(|entry| ComponentSpec {
1857                id: entry.id.as_str().to_string(),
1858                version: entry.version.to_string(),
1859                legacy_path: None,
1860            })
1861            .collect();
1862    }
1863    if let Some(legacy_manifest) = legacy_manifest {
1864        return legacy_manifest
1865            .components
1866            .iter()
1867            .map(|entry| ComponentSpec {
1868                id: entry.name.clone(),
1869                version: entry.version.to_string(),
1870                legacy_path: Some(entry.file_wasm.clone()),
1871            })
1872            .collect();
1873    }
1874    Vec::new()
1875}
1876
1877fn component_sources_table(
1878    manifest: &greentic_types::PackManifest,
1879) -> Result<Option<HashMap<String, ComponentSourceInfo>>> {
1880    let Some(sources) = manifest.get_component_sources_v1()? else {
1881        return Ok(None);
1882    };
1883    let mut table = HashMap::new();
1884    for entry in sources.components {
1885        let artifact = match entry.artifact {
1886            ArtifactLocationV1::Inline { wasm_path, .. } => {
1887                ComponentArtifactLocation::Inline { wasm_path }
1888            }
1889            ArtifactLocationV1::Remote => ComponentArtifactLocation::Remote,
1890        };
1891        let info = ComponentSourceInfo {
1892            digest: entry.resolved.digest,
1893            source: entry.source,
1894            artifact,
1895        };
1896        if let Some(component_id) = entry.component_id.as_ref() {
1897            table.insert(component_id.as_str().to_string(), info.clone());
1898        }
1899        table.insert(entry.name, info);
1900    }
1901    Ok(Some(table))
1902}
1903
1904fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
1905    if let Some(path) = &spec.legacy_path {
1906        return root.join(path);
1907    }
1908    root.join("components").join(format!("{}.wasm", spec.id))
1909}
1910
1911fn normalize_digest(digest: &str) -> String {
1912    if digest.starts_with("sha256:") || digest.starts_with("blake3:") {
1913        digest.to_string()
1914    } else {
1915        format!("sha256:{digest}")
1916    }
1917}
1918
1919fn compute_digest_for(bytes: &[u8], digest: &str) -> Result<String> {
1920    if digest.starts_with("blake3:") {
1921        let hash = blake3::hash(bytes);
1922        return Ok(format!("blake3:{}", hash.to_hex()));
1923    }
1924    let mut hasher = sha2::Sha256::new();
1925    hasher.update(bytes);
1926    Ok(format!("sha256:{:x}", hasher.finalize()))
1927}
1928
1929fn verify_component_digest(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
1930    let normalized_expected = normalize_digest(expected);
1931    let actual = compute_digest_for(bytes, &normalized_expected)?;
1932    if normalize_digest(&actual) != normalized_expected {
1933        bail!(
1934            "component {component_id} digest mismatch: expected {normalized_expected}, got {actual}"
1935        );
1936    }
1937    Ok(())
1938}
1939
1940fn dist_options_from(component_resolution: &ComponentResolution) -> DistOptions {
1941    let mut opts = DistOptions {
1942        allow_tags: true,
1943        ..DistOptions::default()
1944    };
1945    if let Some(cache_dir) = component_resolution.dist_cache_dir.clone() {
1946        opts.cache_dir = cache_dir;
1947    }
1948    if component_resolution.dist_offline {
1949        opts.offline = true;
1950    }
1951    opts
1952}
1953
1954#[allow(clippy::too_many_arguments)]
1955async fn load_components_from_sources(
1956    engine: &Engine,
1957    component_sources: &HashMap<String, ComponentSourceInfo>,
1958    component_resolution: &ComponentResolution,
1959    specs: &[ComponentSpec],
1960    missing: &mut HashSet<String>,
1961    into: &mut HashMap<String, PackComponent>,
1962    materialized_root: Option<&Path>,
1963    archive_hint: Option<&Path>,
1964) -> Result<()> {
1965    let mut archive = if let Some(path) = archive_hint {
1966        Some(
1967            ZipArchive::new(File::open(path)?)
1968                .with_context(|| format!("{} is not a valid gtpack", path.display()))?,
1969        )
1970    } else {
1971        None
1972    };
1973    let mut dist_client: Option<DistClient> = None;
1974
1975    for spec in specs {
1976        if !missing.contains(&spec.id) {
1977            continue;
1978        }
1979        let Some(source) = component_sources.get(&spec.id) else {
1980            continue;
1981        };
1982
1983        let bytes = match &source.artifact {
1984            ComponentArtifactLocation::Inline { wasm_path } => {
1985                if let Some(root) = materialized_root {
1986                    let path = root.join(wasm_path);
1987                    if path.exists() {
1988                        std::fs::read(&path).with_context(|| {
1989                            format!(
1990                                "failed to read inline component {} from {}",
1991                                spec.id,
1992                                path.display()
1993                            )
1994                        })?
1995                    } else if archive.is_none() {
1996                        bail!("inline component {} missing at {}", spec.id, path.display());
1997                    } else {
1998                        read_entry(
1999                            archive.as_mut().expect("archive present when needed"),
2000                            wasm_path,
2001                        )
2002                        .with_context(|| {
2003                            format!(
2004                                "inline component {} missing at {} in pack archive",
2005                                spec.id, wasm_path
2006                            )
2007                        })?
2008                    }
2009                } else if let Some(archive) = archive.as_mut() {
2010                    read_entry(archive, wasm_path).with_context(|| {
2011                        format!(
2012                            "inline component {} missing at {} in pack archive",
2013                            spec.id, wasm_path
2014                        )
2015                    })?
2016                } else {
2017                    bail!(
2018                        "inline component {} missing and no pack source available",
2019                        spec.id
2020                    );
2021                }
2022            }
2023            ComponentArtifactLocation::Remote => {
2024                let client = dist_client.get_or_insert_with(|| {
2025                    DistClient::new(dist_options_from(component_resolution))
2026                });
2027                let reference = source.source.to_string();
2028                let cache_path = if component_resolution.dist_offline {
2029                    client
2030                        .fetch_digest(&source.digest)
2031                        .await
2032                        .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?
2033                } else {
2034                    let resolved = client
2035                        .resolve_ref(&reference)
2036                        .await
2037                        .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
2038                    let expected = normalize_digest(&source.digest);
2039                    let actual = normalize_digest(&resolved.digest);
2040                    if expected != actual {
2041                        bail!(
2042                            "component {} digest mismatch after fetch: expected {}, got {}",
2043                            spec.id,
2044                            expected,
2045                            actual
2046                        );
2047                    }
2048                    resolved.cache_path.ok_or_else(|| {
2049                        anyhow!(
2050                            "component {} resolved from {} but cache path is missing",
2051                            spec.id,
2052                            reference
2053                        )
2054                    })?
2055                };
2056                std::fs::read(&cache_path).with_context(|| {
2057                    format!(
2058                        "failed to read cached component {} from {}",
2059                        spec.id,
2060                        cache_path.display()
2061                    )
2062                })?
2063            }
2064        };
2065
2066        verify_component_digest(&spec.id, &source.digest, &bytes)?;
2067        let component = Component::from_binary(engine, &bytes)
2068            .with_context(|| format!("failed to compile component {}", spec.id))?;
2069        into.insert(
2070            spec.id.clone(),
2071            PackComponent {
2072                name: spec.id.clone(),
2073                version: spec.version.clone(),
2074                component,
2075            },
2076        );
2077        missing.remove(&spec.id);
2078    }
2079
2080    Ok(())
2081}
2082
2083fn dist_error_for_component(err: DistError, component_id: &str, reference: &str) -> anyhow::Error {
2084    match err {
2085        DistError::CacheMiss { reference: missing } => anyhow!(
2086            "remote component {} is not cached for {}. Run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2087            component_id,
2088            missing,
2089            reference
2090        ),
2091        DistError::Offline { reference: blocked } => anyhow!(
2092            "offline mode blocked fetching component {} from {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2093            component_id,
2094            blocked,
2095            reference
2096        ),
2097        DistError::AuthRequired { target } => anyhow!(
2098            "component {} requires authenticated source {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2099            component_id,
2100            target,
2101            reference
2102        ),
2103        other => anyhow!(
2104            "failed to resolve component {} from {}: {}",
2105            component_id,
2106            reference,
2107            other
2108        ),
2109    }
2110}
2111
2112fn load_components_from_overrides(
2113    engine: &Engine,
2114    overrides: &HashMap<String, PathBuf>,
2115    specs: &[ComponentSpec],
2116    missing: &mut HashSet<String>,
2117    into: &mut HashMap<String, PackComponent>,
2118) -> Result<()> {
2119    for spec in specs {
2120        if !missing.contains(&spec.id) {
2121            continue;
2122        }
2123        let Some(path) = overrides.get(&spec.id) else {
2124            continue;
2125        };
2126        let bytes = std::fs::read(path)
2127            .with_context(|| format!("failed to read override component {}", path.display()))?;
2128        let component = Component::from_binary(engine, &bytes).with_context(|| {
2129            format!(
2130                "failed to compile component {} from override {}",
2131                spec.id,
2132                path.display()
2133            )
2134        })?;
2135        into.insert(
2136            spec.id.clone(),
2137            PackComponent {
2138                name: spec.id.clone(),
2139                version: spec.version.clone(),
2140                component,
2141            },
2142        );
2143        missing.remove(&spec.id);
2144    }
2145    Ok(())
2146}
2147
2148fn load_components_from_dir(
2149    engine: &Engine,
2150    root: &Path,
2151    specs: &[ComponentSpec],
2152    missing: &mut HashSet<String>,
2153    into: &mut HashMap<String, PackComponent>,
2154) -> Result<()> {
2155    for spec in specs {
2156        if !missing.contains(&spec.id) {
2157            continue;
2158        }
2159        let path = component_path_for_spec(root, spec);
2160        if !path.exists() {
2161            tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
2162            continue;
2163        }
2164        let bytes = std::fs::read(&path)
2165            .with_context(|| format!("failed to read component {}", path.display()))?;
2166        let component = Component::from_binary(engine, &bytes).with_context(|| {
2167            format!(
2168                "failed to compile component {} from {}",
2169                spec.id,
2170                path.display()
2171            )
2172        })?;
2173        into.insert(
2174            spec.id.clone(),
2175            PackComponent {
2176                name: spec.id.clone(),
2177                version: spec.version.clone(),
2178                component,
2179            },
2180        );
2181        missing.remove(&spec.id);
2182    }
2183    Ok(())
2184}
2185
2186fn load_components_from_archive(
2187    engine: &Engine,
2188    path: &Path,
2189    specs: &[ComponentSpec],
2190    missing: &mut HashSet<String>,
2191    into: &mut HashMap<String, PackComponent>,
2192) -> Result<()> {
2193    let mut archive = ZipArchive::new(File::open(path)?)
2194        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
2195    for spec in specs {
2196        if !missing.contains(&spec.id) {
2197            continue;
2198        }
2199        let file_name = spec
2200            .legacy_path
2201            .clone()
2202            .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
2203        let bytes = match read_entry(&mut archive, &file_name) {
2204            Ok(bytes) => bytes,
2205            Err(err) => {
2206                warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
2207                continue;
2208            }
2209        };
2210        let component = Component::from_binary(engine, &bytes)
2211            .with_context(|| format!("failed to compile component {}", spec.id))?;
2212        into.insert(
2213            spec.id.clone(),
2214            PackComponent {
2215                name: spec.id.clone(),
2216                version: spec.version.clone(),
2217                component,
2218            },
2219        );
2220        missing.remove(&spec.id);
2221    }
2222    Ok(())
2223}
2224
2225#[cfg(test)]
2226mod tests {
2227    use super::*;
2228    use greentic_flow::model::{FlowDoc, NodeDoc};
2229    use indexmap::IndexMap;
2230    use serde_json::json;
2231
2232    #[test]
2233    fn normalizes_raw_component_to_component_exec() {
2234        let mut nodes = IndexMap::new();
2235        let mut raw = IndexMap::new();
2236        raw.insert(
2237            "templating.handlebars".into(),
2238            json!({ "template": "Hi {{name}}" }),
2239        );
2240        nodes.insert(
2241            "start".into(),
2242            NodeDoc {
2243                raw,
2244                routing: json!([{"out": true}]),
2245                ..Default::default()
2246            },
2247        );
2248        let doc = FlowDoc {
2249            id: "welcome".into(),
2250            title: None,
2251            description: None,
2252            flow_type: "messaging".into(),
2253            start: Some("start".into()),
2254            parameters: json!({}),
2255            tags: Vec::new(),
2256            schema_version: None,
2257            entrypoints: IndexMap::new(),
2258            nodes,
2259        };
2260
2261        let normalized = normalize_flow_doc(doc);
2262        let node = normalized.nodes.get("start").expect("node exists");
2263        assert_eq!(node.operation.as_deref(), Some("component.exec"));
2264        assert!(node.raw.is_empty());
2265        let payload = node.payload.as_object().expect("payload object");
2266        assert_eq!(
2267            payload.get("component"),
2268            Some(&Value::String("templating.handlebars".into()))
2269        );
2270        assert_eq!(
2271            payload.get("operation"),
2272            Some(&Value::String("render".into()))
2273        );
2274        let input = payload.get("input").unwrap();
2275        assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
2276    }
2277}
2278
2279#[derive(Clone, Debug, Default, Serialize, Deserialize)]
2280pub struct PackMetadata {
2281    pub pack_id: String,
2282    pub version: String,
2283    #[serde(default)]
2284    pub entry_flows: Vec<String>,
2285    #[serde(default)]
2286    pub secret_requirements: Vec<greentic_types::SecretRequirement>,
2287}
2288
2289impl PackMetadata {
2290    fn from_wasm(bytes: &[u8]) -> Option<Self> {
2291        let parser = Parser::new(0);
2292        for payload in parser.parse_all(bytes) {
2293            let payload = payload.ok()?;
2294            match payload {
2295                Payload::CustomSection(section) => {
2296                    if section.name() == "greentic.manifest"
2297                        && let Ok(meta) = Self::from_bytes(section.data())
2298                    {
2299                        return Some(meta);
2300                    }
2301                }
2302                Payload::DataSection(reader) => {
2303                    for segment in reader.into_iter().flatten() {
2304                        if let Ok(meta) = Self::from_bytes(segment.data) {
2305                            return Some(meta);
2306                        }
2307                    }
2308                }
2309                _ => {}
2310            }
2311        }
2312        None
2313    }
2314
2315    fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
2316        #[derive(Deserialize)]
2317        struct RawManifest {
2318            pack_id: String,
2319            version: String,
2320            #[serde(default)]
2321            entry_flows: Vec<String>,
2322            #[serde(default)]
2323            flows: Vec<RawFlow>,
2324            #[serde(default)]
2325            secret_requirements: Vec<greentic_types::SecretRequirement>,
2326        }
2327
2328        #[derive(Deserialize)]
2329        struct RawFlow {
2330            id: String,
2331        }
2332
2333        let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
2334        let mut entry_flows = if manifest.entry_flows.is_empty() {
2335            manifest.flows.iter().map(|f| f.id.clone()).collect()
2336        } else {
2337            manifest.entry_flows.clone()
2338        };
2339        entry_flows.retain(|id| !id.is_empty());
2340        Ok(Self {
2341            pack_id: manifest.pack_id,
2342            version: manifest.version,
2343            entry_flows,
2344            secret_requirements: manifest.secret_requirements,
2345        })
2346    }
2347
2348    pub fn fallback(path: &Path) -> Self {
2349        let pack_id = path
2350            .file_stem()
2351            .map(|s| s.to_string_lossy().into_owned())
2352            .unwrap_or_else(|| "unknown-pack".to_string());
2353        Self {
2354            pack_id,
2355            version: "0.0.0".to_string(),
2356            entry_flows: Vec::new(),
2357            secret_requirements: Vec::new(),
2358        }
2359    }
2360
2361    pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
2362        let entry_flows = manifest
2363            .flows
2364            .iter()
2365            .map(|flow| flow.id.as_str().to_string())
2366            .collect::<Vec<_>>();
2367        Self {
2368            pack_id: manifest.pack_id.as_str().to_string(),
2369            version: manifest.version.to_string(),
2370            entry_flows,
2371            secret_requirements: manifest.secret_requirements.clone(),
2372        }
2373    }
2374}