greentic_runner_host/
pack.rs

1use std::collections::HashMap;
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::component_api::component::greentic::component::control::Host as ComponentControlHost;
10use crate::component_api::{
11    ComponentPre, control, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::runtime_wasmtime::{Component, Engine, Linker, ResourceTable};
15use anyhow::{Context, Result, anyhow, bail};
16use greentic_interfaces_wasmtime::host_helpers::v1::{
17    self as host_v1, HostFns, add_all_v1_to_linker,
18    messaging_session::{
19        MessagingSessionError as MessagingError, MessagingSessionHost, OpAck as MessagingAck,
20        OutboundMessage, TenantCtx as MessagingTenantCtx,
21    },
22    runner_host_http::RunnerHostHttp,
23    runner_host_kv::RunnerHostKv,
24    secrets_store::{SecretsError, SecretsStoreHost},
25    state_store::{
26        OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
27        StateStoreHost, TenantCtx as StateTenantCtx,
28    },
29    telemetry_logger::{
30        OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
31        TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
32        TenantCtx as TelemetryTenantCtx,
33    },
34};
35use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
36use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
37use greentic_pack::builder as legacy_pack;
38use greentic_types::{
39    EnvId, Flow, StateKey as StoreStateKey, TeamId, TenantCtx as TypesTenantCtx, TenantId, UserId,
40    decode_pack_manifest,
41};
42use host_v1::http_client::{
43    HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
44    Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
45    RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
46    TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
47};
48use once_cell::sync::Lazy;
49use parking_lot::Mutex;
50use reqwest::blocking::Client as BlockingClient;
51use runner_core::normalize_under_root;
52use serde::{Deserialize, Serialize};
53use serde_cbor;
54use serde_json::{self, Value};
55use tokio::fs;
56use wasmparser::{Parser, Payload};
57use wasmtime::StoreContextMut;
58use zip::ZipArchive;
59
60use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
61use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
62use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
63
64use crate::config::HostConfig;
65use crate::secrets::{DynSecretsManager, read_secret_blocking};
66use crate::storage::state::STATE_PREFIX;
67use crate::storage::{DynSessionStore, DynStateStore};
68use crate::verify;
69use crate::wasi::RunnerWasiPolicy;
70use tracing::warn;
71use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
72use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
73
74use greentic_flow::model::FlowDoc;
75
76#[allow(dead_code)]
77pub struct PackRuntime {
78    /// Component artifact path (wasm file).
79    path: PathBuf,
80    /// Optional archive (.gtpack) used to load flows/manifests.
81    archive_path: Option<PathBuf>,
82    config: Arc<HostConfig>,
83    engine: Engine,
84    metadata: PackMetadata,
85    manifest: Option<greentic_types::PackManifest>,
86    legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
87    mocks: Option<Arc<MockLayer>>,
88    flows: Option<PackFlows>,
89    components: HashMap<String, PackComponent>,
90    http_client: Arc<BlockingClient>,
91    pre_cache: Mutex<HashMap<String, ComponentPre<ComponentState>>>,
92    session_store: Option<DynSessionStore>,
93    state_store: Option<DynStateStore>,
94    wasi_policy: Arc<RunnerWasiPolicy>,
95    secrets: DynSecretsManager,
96    oauth_config: Option<OAuthBrokerConfig>,
97}
98
99struct PackComponent {
100    #[allow(dead_code)]
101    name: String,
102    #[allow(dead_code)]
103    version: String,
104    component: Component,
105}
106
107fn build_blocking_client() -> BlockingClient {
108    std::thread::spawn(|| {
109        BlockingClient::builder()
110            .no_proxy()
111            .build()
112            .expect("blocking client")
113    })
114    .join()
115    .expect("client build thread panicked")
116}
117
118fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
119    let (root, candidate) = if path.is_absolute() {
120        let parent = path
121            .parent()
122            .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
123        let root = parent
124            .canonicalize()
125            .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
126        let file = path
127            .file_name()
128            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
129        (root, PathBuf::from(file))
130    } else {
131        let cwd = std::env::current_dir().context("failed to resolve current directory")?;
132        let base = if let Some(parent) = path.parent() {
133            cwd.join(parent)
134        } else {
135            cwd
136        };
137        let root = base
138            .canonicalize()
139            .with_context(|| format!("failed to canonicalize {}", base.display()))?;
140        let file = path
141            .file_name()
142            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
143        (root, PathBuf::from(file))
144    };
145    let safe = normalize_under_root(&root, &candidate)?;
146    Ok((root, safe))
147}
148
149static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
150
151#[derive(Debug, Clone, Serialize, Deserialize)]
152pub struct FlowDescriptor {
153    pub id: String,
154    #[serde(rename = "type")]
155    pub flow_type: String,
156    pub profile: String,
157    pub version: String,
158    #[serde(default)]
159    pub description: Option<String>,
160}
161
162pub struct HostState {
163    config: Arc<HostConfig>,
164    http_client: Arc<BlockingClient>,
165    default_env: String,
166    #[allow(dead_code)]
167    session_store: Option<DynSessionStore>,
168    state_store: Option<DynStateStore>,
169    mocks: Option<Arc<MockLayer>>,
170    secrets: DynSecretsManager,
171    oauth_config: Option<OAuthBrokerConfig>,
172    oauth_host: OAuthBrokerHost,
173}
174
175impl HostState {
176    #[allow(clippy::default_constructed_unit_structs)]
177    pub fn new(
178        config: Arc<HostConfig>,
179        http_client: Arc<BlockingClient>,
180        mocks: Option<Arc<MockLayer>>,
181        session_store: Option<DynSessionStore>,
182        state_store: Option<DynStateStore>,
183        secrets: DynSecretsManager,
184        oauth_config: Option<OAuthBrokerConfig>,
185    ) -> Result<Self> {
186        let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
187        Ok(Self {
188            config,
189            http_client,
190            default_env,
191            session_store,
192            state_store,
193            mocks,
194            secrets,
195            oauth_config,
196            oauth_host: OAuthBrokerHost::default(),
197        })
198    }
199
200    pub fn get_secret(&self, key: &str) -> Result<String> {
201        if !self.config.secrets_policy.is_allowed(key) {
202            bail!("secret {key} is not permitted by bindings policy");
203        }
204        if let Some(mock) = &self.mocks
205            && let Some(value) = mock.secrets_lookup(key)
206        {
207            return Ok(value);
208        }
209        let bytes = read_secret_blocking(&self.secrets, key)
210            .context("failed to read secret from manager")?;
211        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
212        Ok(value)
213    }
214
215    fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
216        let tenant_raw = ctx
217            .as_ref()
218            .map(|ctx| ctx.tenant.clone())
219            .unwrap_or_else(|| self.config.tenant.clone());
220        let env_raw = ctx
221            .as_ref()
222            .map(|ctx| ctx.env.clone())
223            .unwrap_or_else(|| self.default_env.clone());
224        let tenant_id = TenantId::from_str(&tenant_raw)
225            .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
226        let env_id = EnvId::from_str(&env_raw)
227            .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
228        let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
229        if let Some(ctx) = ctx {
230            if let Some(team) = ctx.team.or(ctx.team_id) {
231                let team_id =
232                    TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
233                tenant_ctx = tenant_ctx.with_team(Some(team_id));
234            }
235            if let Some(user) = ctx.user.or(ctx.user_id) {
236                let user_id =
237                    UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
238                tenant_ctx = tenant_ctx.with_user(Some(user_id));
239            }
240            if let Some(flow) = ctx.flow_id {
241                tenant_ctx = tenant_ctx.with_flow(flow);
242            }
243            if let Some(node) = ctx.node_id {
244                tenant_ctx = tenant_ctx.with_node(node);
245            }
246            if let Some(provider) = ctx.provider_id {
247                tenant_ctx = tenant_ctx.with_provider(provider);
248            }
249            if let Some(session) = ctx.session_id {
250                tenant_ctx = tenant_ctx.with_session(session);
251            }
252            tenant_ctx.trace_id = ctx.trace_id;
253        }
254        Ok(tenant_ctx)
255    }
256
257    fn send_http_request(
258        &mut self,
259        req: HttpRequest,
260        opts: Option<HttpRequestOptionsV1_1>,
261        _ctx: Option<HttpTenantCtx>,
262    ) -> Result<HttpResponse, HttpClientError> {
263        if !self.config.http_enabled {
264            return Err(HttpClientError {
265                code: "denied".into(),
266                message: "http client disabled by policy".into(),
267            });
268        }
269
270        let mut mock_state = None;
271        let raw_body = req.body.clone();
272        if let Some(mock) = &self.mocks
273            && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
274        {
275            match mock.http_begin(&meta) {
276                HttpDecision::Mock(response) => {
277                    let headers = response
278                        .headers
279                        .iter()
280                        .map(|(k, v)| (k.clone(), v.clone()))
281                        .collect();
282                    return Ok(HttpResponse {
283                        status: response.status,
284                        headers,
285                        body: response.body.clone().map(|b| b.into_bytes()),
286                    });
287                }
288                HttpDecision::Deny(reason) => {
289                    return Err(HttpClientError {
290                        code: "denied".into(),
291                        message: reason,
292                    });
293                }
294                HttpDecision::Passthrough { record } => {
295                    mock_state = Some((meta, record));
296                }
297            }
298        }
299
300        let method = req.method.parse().unwrap_or(reqwest::Method::GET);
301        let mut builder = self.http_client.request(method, &req.url);
302        for (key, value) in req.headers {
303            if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
304                && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
305            {
306                builder = builder.header(header, header_value);
307            }
308        }
309
310        if let Some(body) = raw_body.clone() {
311            builder = builder.body(body);
312        }
313
314        if let Some(opts) = opts {
315            if let Some(timeout_ms) = opts.timeout_ms {
316                builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
317            }
318            if opts.allow_insecure == Some(true) {
319                warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
320            }
321            if let Some(follow_redirects) = opts.follow_redirects
322                && !follow_redirects
323            {
324                warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
325            }
326        }
327
328        let response = match builder.send() {
329            Ok(resp) => resp,
330            Err(err) => {
331                warn!(url = %req.url, error = %err, "http client request failed");
332                return Err(HttpClientError {
333                    code: "unavailable".into(),
334                    message: err.to_string(),
335                });
336            }
337        };
338
339        let status = response.status().as_u16();
340        let headers_vec = response
341            .headers()
342            .iter()
343            .map(|(k, v)| {
344                (
345                    k.as_str().to_string(),
346                    v.to_str().unwrap_or_default().to_string(),
347                )
348            })
349            .collect::<Vec<_>>();
350        let body_bytes = response.bytes().ok().map(|b| b.to_vec());
351
352        if let Some((meta, true)) = mock_state.take()
353            && let Some(mock) = &self.mocks
354        {
355            let recorded = HttpMockResponse::new(
356                status,
357                headers_vec.clone().into_iter().collect(),
358                body_bytes
359                    .as_ref()
360                    .map(|b| String::from_utf8_lossy(b).into_owned()),
361            );
362            mock.http_record(&meta, &recorded);
363        }
364
365        Ok(HttpResponse {
366            status,
367            headers: headers_vec,
368            body: body_bytes,
369        })
370    }
371}
372
373impl SecretsStoreHost for HostState {
374    fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
375        if !self.config.secrets_policy.is_allowed(&key) {
376            return Err(SecretsError::Denied);
377        }
378        if let Some(mock) = &self.mocks
379            && let Some(value) = mock.secrets_lookup(&key)
380        {
381            return Ok(Some(value.into_bytes()));
382        }
383        match read_secret_blocking(&self.secrets, &key) {
384            Ok(bytes) => Ok(Some(bytes)),
385            Err(err) => {
386                warn!(secret = %key, error = %err, "secret lookup failed");
387                Err(SecretsError::NotFound)
388            }
389        }
390    }
391}
392
393impl HttpClientHost for HostState {
394    fn send(
395        &mut self,
396        req: HttpRequest,
397        ctx: Option<HttpTenantCtx>,
398    ) -> Result<HttpResponse, HttpClientError> {
399        self.send_http_request(req, None, ctx)
400    }
401}
402
403impl HttpClientHostV1_1 for HostState {
404    fn send(
405        &mut self,
406        req: HttpRequestV1_1,
407        opts: Option<HttpRequestOptionsV1_1>,
408        ctx: Option<HttpTenantCtxV1_1>,
409    ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
410        let legacy_req = HttpRequest {
411            method: req.method,
412            url: req.url,
413            headers: req.headers,
414            body: req.body,
415        };
416        let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
417            env: ctx.env,
418            tenant: ctx.tenant,
419            tenant_id: ctx.tenant_id,
420            team: ctx.team,
421            team_id: ctx.team_id,
422            user: ctx.user,
423            user_id: ctx.user_id,
424            trace_id: ctx.trace_id,
425            correlation_id: ctx.correlation_id,
426            attributes: ctx.attributes,
427            session_id: ctx.session_id,
428            flow_id: ctx.flow_id,
429            node_id: ctx.node_id,
430            provider_id: ctx.provider_id,
431            deadline_ms: ctx.deadline_ms,
432            attempt: ctx.attempt,
433            idempotency_key: ctx.idempotency_key,
434            impersonation: ctx
435                .impersonation
436                .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
437                    actor_id,
438                    reason,
439                }),
440        });
441
442        self.send_http_request(legacy_req, opts, legacy_ctx)
443            .map(|resp| HttpResponseV1_1 {
444                status: resp.status,
445                headers: resp.headers,
446                body: resp.body,
447            })
448            .map_err(|err| HttpClientErrorV1_1 {
449                code: err.code,
450                message: err.message,
451            })
452    }
453}
454
455impl StateStoreHost for HostState {
456    fn read(
457        &mut self,
458        key: HostStateKey,
459        ctx: Option<StateTenantCtx>,
460    ) -> Result<Vec<u8>, StateError> {
461        let store = match self.state_store.as_ref() {
462            Some(store) => store.clone(),
463            None => {
464                return Err(StateError {
465                    code: "unavailable".into(),
466                    message: "state store not configured".into(),
467                });
468            }
469        };
470        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
471            Ok(ctx) => ctx,
472            Err(err) => {
473                return Err(StateError {
474                    code: "invalid-ctx".into(),
475                    message: err.to_string(),
476                });
477            }
478        };
479        let key = StoreStateKey::from(key);
480        match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
481            Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
482            Ok(None) => Err(StateError {
483                code: "not_found".into(),
484                message: "state key not found".into(),
485            }),
486            Err(err) => Err(StateError {
487                code: "internal".into(),
488                message: err.to_string(),
489            }),
490        }
491    }
492
493    fn write(
494        &mut self,
495        key: HostStateKey,
496        bytes: Vec<u8>,
497        ctx: Option<StateTenantCtx>,
498    ) -> Result<StateOpAck, StateError> {
499        let store = match self.state_store.as_ref() {
500            Some(store) => store.clone(),
501            None => {
502                return Err(StateError {
503                    code: "unavailable".into(),
504                    message: "state store not configured".into(),
505                });
506            }
507        };
508        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
509            Ok(ctx) => ctx,
510            Err(err) => {
511                return Err(StateError {
512                    code: "invalid-ctx".into(),
513                    message: err.to_string(),
514                });
515            }
516        };
517        let key = StoreStateKey::from(key);
518        let value = serde_json::from_slice(&bytes)
519            .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
520        match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
521            Ok(()) => Ok(StateOpAck::Ok),
522            Err(err) => Err(StateError {
523                code: "internal".into(),
524                message: err.to_string(),
525            }),
526        }
527    }
528
529    fn delete(
530        &mut self,
531        key: HostStateKey,
532        ctx: Option<StateTenantCtx>,
533    ) -> Result<StateOpAck, StateError> {
534        let store = match self.state_store.as_ref() {
535            Some(store) => store.clone(),
536            None => {
537                return Err(StateError {
538                    code: "unavailable".into(),
539                    message: "state store not configured".into(),
540                });
541            }
542        };
543        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
544            Ok(ctx) => ctx,
545            Err(err) => {
546                return Err(StateError {
547                    code: "invalid-ctx".into(),
548                    message: err.to_string(),
549                });
550            }
551        };
552        let key = StoreStateKey::from(key);
553        match store.del(&tenant_ctx, STATE_PREFIX, &key) {
554            Ok(_) => Ok(StateOpAck::Ok),
555            Err(err) => Err(StateError {
556                code: "internal".into(),
557                message: err.to_string(),
558            }),
559        }
560    }
561}
562
563impl TelemetryLoggerHost for HostState {
564    fn log(
565        &mut self,
566        span: TelemetrySpanContext,
567        fields: Vec<(String, String)>,
568        _ctx: Option<TelemetryTenantCtx>,
569    ) -> Result<TelemetryAck, TelemetryError> {
570        if let Some(mock) = &self.mocks
571            && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
572        {
573            return Ok(TelemetryAck::Ok);
574        }
575        let mut map = serde_json::Map::new();
576        for (k, v) in fields {
577            map.insert(k, Value::String(v));
578        }
579        tracing::info!(
580            tenant = %span.tenant,
581            flow_id = %span.flow_id,
582            node = ?span.node_id,
583            provider = %span.provider,
584            fields = %serde_json::Value::Object(map.clone()),
585            "telemetry log from pack"
586        );
587        Ok(TelemetryAck::Ok)
588    }
589}
590
591impl RunnerHostHttp for HostState {
592    fn request(
593        &mut self,
594        method: String,
595        url: String,
596        headers: Vec<String>,
597        body: Option<Vec<u8>>,
598    ) -> Result<Vec<u8>, String> {
599        let req = HttpRequest {
600            method,
601            url,
602            headers: headers
603                .chunks(2)
604                .filter_map(|chunk| {
605                    if chunk.len() == 2 {
606                        Some((chunk[0].clone(), chunk[1].clone()))
607                    } else {
608                        None
609                    }
610                })
611                .collect(),
612            body,
613        };
614        match HttpClientHost::send(self, req, None) {
615            Ok(resp) => Ok(resp.body.unwrap_or_default()),
616            Err(err) => Err(err.message),
617        }
618    }
619}
620
621impl RunnerHostKv for HostState {
622    fn get(&mut self, _ns: String, _key: String) -> Option<String> {
623        None
624    }
625
626    fn put(&mut self, _ns: String, _key: String, _val: String) {}
627}
628
629impl MessagingSessionHost for HostState {
630    fn send(
631        &mut self,
632        _message: OutboundMessage,
633        _ctx: MessagingTenantCtx,
634    ) -> Result<MessagingAck, MessagingError> {
635        Err(MessagingError {
636            code: "unimplemented".into(),
637            message: "messaging session host not wired".into(),
638        })
639    }
640}
641
642enum ManifestLoad {
643    New {
644        manifest: Box<greentic_types::PackManifest>,
645        flows: PackFlows,
646    },
647    Legacy {
648        manifest: Box<legacy_pack::PackManifest>,
649        flows: PackFlows,
650    },
651}
652
653fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
654    let mut archive = ZipArchive::new(File::open(path)?)
655        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
656    let bytes = read_entry(&mut archive, "manifest.cbor")
657        .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
658    match decode_pack_manifest(&bytes) {
659        Ok(manifest) => {
660            let cache = PackFlows::from_manifest(manifest.clone());
661            Ok(ManifestLoad::New {
662                manifest: Box::new(manifest),
663                flows: cache,
664            })
665        }
666        Err(err) => {
667            tracing::debug!(error = %err, pack = %path.display(), "decode_pack_manifest failed; trying legacy manifest");
668            // Fall back to legacy pack manifest
669            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
670                .context("failed to decode legacy pack manifest from manifest.cbor")?;
671            let flows = load_legacy_flows(&mut archive, &legacy)?;
672            Ok(ManifestLoad::Legacy {
673                manifest: Box::new(legacy),
674                flows,
675            })
676        }
677    }
678}
679
680fn load_legacy_flows(
681    archive: &mut ZipArchive<File>,
682    manifest: &legacy_pack::PackManifest,
683) -> Result<PackFlows> {
684    let mut flows = HashMap::new();
685    let mut descriptors = Vec::new();
686
687    for entry in &manifest.flows {
688        let bytes = read_entry(archive, &entry.file_json)
689            .with_context(|| format!("missing flow json {}", entry.file_json))?;
690        let doc: FlowDoc = serde_json::from_slice(&bytes)
691            .with_context(|| format!("failed to decode flow doc {}", entry.file_json))?;
692        let normalized = normalize_flow_doc(doc);
693        let flow_ir = flow_doc_to_ir(normalized)?;
694        let flow = flow_ir_to_flow(flow_ir)?;
695
696        descriptors.push(FlowDescriptor {
697            id: entry.id.clone(),
698            flow_type: entry.kind.clone(),
699            profile: manifest.meta.pack_id.clone(),
700            version: manifest.meta.version.to_string(),
701            description: None,
702        });
703        flows.insert(entry.id.clone(), flow);
704    }
705
706    let mut entry_flows = manifest.meta.entry_flows.clone();
707    if entry_flows.is_empty() {
708        entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
709    }
710    let metadata = PackMetadata {
711        pack_id: manifest.meta.pack_id.clone(),
712        version: manifest.meta.version.to_string(),
713        entry_flows,
714        secret_requirements: Vec::new(),
715    };
716
717    Ok(PackFlows {
718        descriptors,
719        flows,
720        metadata,
721    })
722}
723
724pub struct ComponentState {
725    pub host: HostState,
726    wasi_ctx: WasiCtx,
727    resource_table: ResourceTable,
728}
729
730impl ComponentState {
731    pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
732        let wasi_ctx = policy
733            .instantiate()
734            .context("failed to build WASI context")?;
735        Ok(Self {
736            host,
737            wasi_ctx,
738            resource_table: ResourceTable::new(),
739        })
740    }
741
742    fn host_mut(&mut self) -> &mut HostState {
743        &mut self.host
744    }
745}
746
747impl control::Host for ComponentState {
748    fn should_cancel(&mut self) -> bool {
749        false
750    }
751
752    fn yield_now(&mut self) {
753        // no-op cooperative yield
754    }
755}
756
757fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
758    let mut inst = linker.instance("greentic:component/control@0.4.0")?;
759    inst.func_wrap(
760        "should-cancel",
761        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
762            let host = caller.data_mut();
763            Ok((ComponentControlHost::should_cancel(host),))
764        },
765    )?;
766    inst.func_wrap(
767        "yield-now",
768        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
769            let host = caller.data_mut();
770            ComponentControlHost::yield_now(host);
771            Ok(())
772        },
773    )?;
774    Ok(())
775}
776
777pub fn register_all(linker: &mut Linker<ComponentState>) -> Result<()> {
778    add_wasi_to_linker(linker)?;
779    add_all_v1_to_linker(
780        linker,
781        HostFns {
782            http_client_v1_1: Some(|state| state.host_mut()),
783            http_client: Some(|state| state.host_mut()),
784            oauth_broker: None,
785            runner_host_http: Some(|state| state.host_mut()),
786            runner_host_kv: Some(|state| state.host_mut()),
787            messaging_session: Some(|state| state.host_mut()),
788            telemetry_logger: Some(|state| state.host_mut()),
789            state_store: Some(|state| state.host_mut()),
790            secrets_store: Some(|state| state.host_mut()),
791        },
792    )?;
793    Ok(())
794}
795
796impl OAuthHostContext for ComponentState {
797    fn tenant_id(&self) -> &str {
798        &self.host.config.tenant
799    }
800
801    fn env(&self) -> &str {
802        &self.host.default_env
803    }
804
805    fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
806        &mut self.host.oauth_host
807    }
808
809    fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
810        self.host.oauth_config.as_ref()
811    }
812}
813
814impl WasiView for ComponentState {
815    fn ctx(&mut self) -> WasiCtxView<'_> {
816        WasiCtxView {
817            ctx: &mut self.wasi_ctx,
818            table: &mut self.resource_table,
819        }
820    }
821}
822
823#[allow(unsafe_code)]
824unsafe impl Send for ComponentState {}
825#[allow(unsafe_code)]
826unsafe impl Sync for ComponentState {}
827
828impl PackRuntime {
829    #[allow(clippy::too_many_arguments)]
830    pub async fn load(
831        path: impl AsRef<Path>,
832        config: Arc<HostConfig>,
833        mocks: Option<Arc<MockLayer>>,
834        archive_source: Option<&Path>,
835        session_store: Option<DynSessionStore>,
836        state_store: Option<DynStateStore>,
837        wasi_policy: Arc<RunnerWasiPolicy>,
838        secrets: DynSecretsManager,
839        oauth_config: Option<OAuthBrokerConfig>,
840        verify_archive: bool,
841    ) -> Result<Self> {
842        let path = path.as_ref();
843        let (_pack_root, safe_path) = normalize_pack_path(path)?;
844        let is_component = safe_path
845            .extension()
846            .and_then(|ext| ext.to_str())
847            .map(|ext| ext.eq_ignore_ascii_case("wasm"))
848            .unwrap_or(false);
849        let archive_hint_path = if let Some(source) = archive_source {
850            let (_, normalized) = normalize_pack_path(source)?;
851            Some(normalized)
852        } else if is_component {
853            None
854        } else {
855            Some(safe_path.clone())
856        };
857        let archive_hint = archive_hint_path.as_deref();
858        if verify_archive {
859            let verify_target = archive_hint.unwrap_or(&safe_path);
860            verify::verify_pack(verify_target).await?;
861            tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
862        }
863        let engine = Engine::default();
864        let wasm_bytes = fs::read(&safe_path).await?;
865        let mut metadata = PackMetadata::from_wasm(&wasm_bytes)
866            .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
867        let mut manifest = None;
868        let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
869        let flows = if let Some(archive_path) = archive_hint {
870            match load_manifest_and_flows(archive_path) {
871                Ok(ManifestLoad::New {
872                    manifest: m,
873                    flows: cache,
874                }) => {
875                    metadata = cache.metadata.clone();
876                    manifest = Some(*m);
877                    Some(cache)
878                }
879                Ok(ManifestLoad::Legacy {
880                    manifest: m,
881                    flows: cache,
882                }) => {
883                    metadata = cache.metadata.clone();
884                    legacy_manifest = Some(m);
885                    Some(cache)
886                }
887                Err(err) => {
888                    warn!(error = %err, pack = %archive_path.display(), "failed to parse pack manifest; skipping flows");
889                    None
890                }
891            }
892        } else {
893            None
894        };
895        let components = if let Some(archive_path) = archive_hint {
896            if let Some(new_manifest) = manifest.as_ref() {
897                match load_components_from_archive(&engine, archive_path, Some(new_manifest)) {
898                    Ok(map) => map,
899                    Err(err) => {
900                        warn!(error = %err, pack = %archive_path.display(), "failed to load components from archive");
901                        HashMap::new()
902                    }
903                }
904            } else if let Some(legacy) = legacy_manifest.as_ref() {
905                match load_legacy_components_from_archive(&engine, archive_path, legacy) {
906                    Ok(map) => map,
907                    Err(err) => {
908                        warn!(error = %err, pack = %archive_path.display(), "failed to load components from archive");
909                        HashMap::new()
910                    }
911                }
912            } else {
913                HashMap::new()
914            }
915        } else if is_component {
916            let name = safe_path
917                .file_stem()
918                .map(|s| s.to_string_lossy().to_string())
919                .unwrap_or_else(|| "component".to_string());
920            let component = Component::from_binary(&engine, &wasm_bytes)?;
921            let mut map = HashMap::new();
922            map.insert(
923                name.clone(),
924                PackComponent {
925                    name,
926                    version: metadata.version.clone(),
927                    component,
928                },
929            );
930            map
931        } else {
932            HashMap::new()
933        };
934        let http_client = Arc::clone(&HTTP_CLIENT);
935        Ok(Self {
936            path: safe_path,
937            archive_path: archive_hint.map(Path::to_path_buf),
938            config,
939            engine,
940            metadata,
941            manifest,
942            legacy_manifest,
943            mocks,
944            flows,
945            components,
946            http_client,
947            pre_cache: Mutex::new(HashMap::new()),
948            session_store,
949            state_store,
950            wasi_policy,
951            secrets,
952            oauth_config,
953        })
954    }
955
956    pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
957        if let Some(cache) = &self.flows {
958            return Ok(cache.descriptors.clone());
959        }
960        if let Some(manifest) = &self.manifest {
961            let descriptors = manifest
962                .flows
963                .iter()
964                .map(|flow| FlowDescriptor {
965                    id: flow.id.as_str().to_string(),
966                    flow_type: flow_kind_to_str(flow.kind).to_string(),
967                    profile: manifest.pack_id.as_str().to_string(),
968                    version: manifest.version.to_string(),
969                    description: None,
970                })
971                .collect();
972            return Ok(descriptors);
973        }
974        Ok(Vec::new())
975    }
976
977    #[allow(dead_code)]
978    pub async fn run_flow(
979        &self,
980        flow_id: &str,
981        input: serde_json::Value,
982    ) -> Result<serde_json::Value> {
983        let pack = Arc::new(
984            PackRuntime::load(
985                &self.path,
986                Arc::clone(&self.config),
987                self.mocks.clone(),
988                self.archive_path.as_deref(),
989                self.session_store.clone(),
990                self.state_store.clone(),
991                Arc::clone(&self.wasi_policy),
992                self.secrets.clone(),
993                self.oauth_config.clone(),
994                false,
995            )
996            .await?,
997        );
998
999        let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1000        let retry_config = self.config.retry_config().into();
1001        let mocks = pack.mocks.as_deref();
1002        let tenant = self.config.tenant.as_str();
1003
1004        let ctx = FlowContext {
1005            tenant,
1006            flow_id,
1007            node_id: None,
1008            tool: None,
1009            action: None,
1010            session_id: None,
1011            provider_id: None,
1012            retry_config,
1013            observer: None,
1014            mocks,
1015        };
1016
1017        let execution = engine.execute(ctx, input).await?;
1018        match execution.status {
1019            FlowStatus::Completed => Ok(execution.output),
1020            FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1021                "status": "pending",
1022                "reason": wait.reason,
1023                "resume": wait.snapshot,
1024                "response": execution.output,
1025            })),
1026        }
1027    }
1028
1029    pub async fn invoke_component(
1030        &self,
1031        component_ref: &str,
1032        ctx: ComponentExecCtx,
1033        operation: &str,
1034        _config_json: Option<String>,
1035        input_json: String,
1036    ) -> Result<Value> {
1037        let pack_component = self
1038            .components
1039            .get(component_ref)
1040            .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1041
1042        let pre = if let Some(pre) = self.pre_cache.lock().get(component_ref).cloned() {
1043            pre
1044        } else {
1045            let mut linker = Linker::new(&self.engine);
1046            register_all(&mut linker)?;
1047            add_component_control_to_linker(&mut linker)?;
1048            let pre = ComponentPre::new(
1049                linker
1050                    .instantiate_pre(&pack_component.component)
1051                    .map_err(|err| anyhow!(err))?,
1052            )
1053            .map_err(|err| anyhow!(err))?;
1054            self.pre_cache
1055                .lock()
1056                .insert(component_ref.to_string(), pre.clone());
1057            pre
1058        };
1059
1060        let host_state = HostState::new(
1061            Arc::clone(&self.config),
1062            Arc::clone(&self.http_client),
1063            self.mocks.clone(),
1064            self.session_store.clone(),
1065            self.state_store.clone(),
1066            Arc::clone(&self.secrets),
1067            self.oauth_config.clone(),
1068        )?;
1069        let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1070        let mut store = wasmtime::Store::new(&self.engine, store_state);
1071        let bindings = pre
1072            .instantiate_async(&mut store)
1073            .await
1074            .map_err(|err| anyhow!(err))?;
1075        let node = bindings.greentic_component_node();
1076
1077        let result = node.call_invoke(&mut store, &ctx, operation, &input_json)?;
1078
1079        match result {
1080            InvokeResult::Ok(body) => {
1081                if body.is_empty() {
1082                    return Ok(Value::Null);
1083                }
1084                serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
1085            }
1086            InvokeResult::Err(NodeError {
1087                code,
1088                message,
1089                retryable,
1090                backoff_ms,
1091                details,
1092            }) => {
1093                let mut obj = serde_json::Map::new();
1094                obj.insert("ok".into(), Value::Bool(false));
1095                let mut error = serde_json::Map::new();
1096                error.insert("code".into(), Value::String(code));
1097                error.insert("message".into(), Value::String(message));
1098                error.insert("retryable".into(), Value::Bool(retryable));
1099                if let Some(backoff) = backoff_ms {
1100                    error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1101                }
1102                if let Some(details) = details {
1103                    error.insert(
1104                        "details".into(),
1105                        serde_json::from_str(&details).unwrap_or(Value::String(details)),
1106                    );
1107                }
1108                obj.insert("error".into(), Value::Object(error));
1109                Ok(Value::Object(obj))
1110            }
1111        }
1112    }
1113
1114    pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1115        if let Some(cache) = &self.flows {
1116            return cache
1117                .flows
1118                .get(flow_id)
1119                .cloned()
1120                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1121        }
1122        if let Some(manifest) = &self.manifest {
1123            let entry = manifest
1124                .flows
1125                .iter()
1126                .find(|f| f.id.as_str() == flow_id)
1127                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1128            return Ok(entry.flow.clone());
1129        }
1130        bail!("flow '{flow_id}' not available (pack exports disabled)")
1131    }
1132
1133    pub fn metadata(&self) -> &PackMetadata {
1134        &self.metadata
1135    }
1136
1137    pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1138        &self.metadata.secret_requirements
1139    }
1140
1141    pub fn missing_secrets(
1142        &self,
1143        tenant_ctx: &TypesTenantCtx,
1144    ) -> Vec<greentic_types::SecretRequirement> {
1145        let env = tenant_ctx.env.as_str().to_string();
1146        let tenant = tenant_ctx.tenant.as_str().to_string();
1147        let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1148        self.required_secrets()
1149            .iter()
1150            .filter(|req| {
1151                // scope must match current context if provided
1152                if let Some(scope) = &req.scope {
1153                    if scope.env != env {
1154                        return false;
1155                    }
1156                    if scope.tenant != tenant {
1157                        return false;
1158                    }
1159                    if let Some(ref team_req) = scope.team
1160                        && team.as_ref() != Some(team_req)
1161                    {
1162                        return false;
1163                    }
1164                }
1165                read_secret_blocking(&self.secrets, req.key.as_str()).is_err()
1166            })
1167            .cloned()
1168            .collect()
1169    }
1170
1171    pub fn for_component_test(
1172        components: Vec<(String, PathBuf)>,
1173        flows: HashMap<String, FlowIR>,
1174        config: Arc<HostConfig>,
1175    ) -> Result<Self> {
1176        let engine = Engine::default();
1177        let mut component_map = HashMap::new();
1178        for (name, path) in components {
1179            if !path.exists() {
1180                bail!("component artifact missing: {}", path.display());
1181            }
1182            let wasm_bytes = std::fs::read(&path)?;
1183            let component = Component::from_binary(&engine, &wasm_bytes)
1184                .with_context(|| format!("failed to compile component {}", path.display()))?;
1185            component_map.insert(
1186                name.clone(),
1187                PackComponent {
1188                    name,
1189                    version: "0.0.0".into(),
1190                    component,
1191                },
1192            );
1193        }
1194
1195        let mut flow_map = HashMap::new();
1196        let mut descriptors = Vec::new();
1197        for (id, ir) in flows {
1198            let flow_type = ir.flow_type.clone();
1199            let flow = flow_ir_to_flow(ir)?;
1200            flow_map.insert(id.clone(), flow);
1201            descriptors.push(FlowDescriptor {
1202                id: id.clone(),
1203                flow_type,
1204                profile: "test".into(),
1205                version: "0.0.0".into(),
1206                description: None,
1207            });
1208        }
1209        let flows_cache = PackFlows {
1210            descriptors: descriptors.clone(),
1211            flows: flow_map,
1212            metadata: PackMetadata::fallback(Path::new("component-test")),
1213        };
1214
1215        Ok(Self {
1216            path: PathBuf::new(),
1217            archive_path: None,
1218            config,
1219            engine,
1220            metadata: PackMetadata::fallback(Path::new("component-test")),
1221            manifest: None,
1222            legacy_manifest: None,
1223            mocks: None,
1224            flows: Some(flows_cache),
1225            components: component_map,
1226            http_client: Arc::clone(&HTTP_CLIENT),
1227            pre_cache: Mutex::new(HashMap::new()),
1228            session_store: None,
1229            state_store: None,
1230            wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1231            secrets: crate::secrets::default_manager(),
1232            oauth_config: None,
1233        })
1234    }
1235}
1236
1237struct PackFlows {
1238    descriptors: Vec<FlowDescriptor>,
1239    flows: HashMap<String, Flow>,
1240    metadata: PackMetadata,
1241}
1242
1243impl PackFlows {
1244    fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
1245        let descriptors = manifest
1246            .flows
1247            .iter()
1248            .map(|entry| FlowDescriptor {
1249                id: entry.id.as_str().to_string(),
1250                flow_type: flow_kind_to_str(entry.kind).to_string(),
1251                profile: manifest.pack_id.as_str().to_string(),
1252                version: manifest.version.to_string(),
1253                description: None,
1254            })
1255            .collect();
1256        let mut flows = HashMap::new();
1257        for entry in &manifest.flows {
1258            flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
1259        }
1260        Self {
1261            metadata: PackMetadata::from_manifest(&manifest),
1262            descriptors,
1263            flows,
1264        }
1265    }
1266}
1267
1268fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
1269    match kind {
1270        greentic_types::FlowKind::Messaging => "messaging",
1271        greentic_types::FlowKind::Event => "event",
1272        greentic_types::FlowKind::ComponentConfig => "component-config",
1273        greentic_types::FlowKind::Job => "job",
1274        greentic_types::FlowKind::Http => "http",
1275    }
1276}
1277
1278fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1279    let mut file = archive
1280        .by_name(name)
1281        .with_context(|| format!("entry {name} missing from archive"))?;
1282    let mut buf = Vec::new();
1283    file.read_to_end(&mut buf)?;
1284    Ok(buf)
1285}
1286
1287fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
1288    for node in doc.nodes.values_mut() {
1289        if node.component.is_empty()
1290            && let Some((component_ref, payload)) = node.raw.iter().next()
1291        {
1292            if component_ref.starts_with("emit.") {
1293                node.component = component_ref.clone();
1294                node.payload = payload.clone();
1295                node.raw.clear();
1296                continue;
1297            }
1298            let (target_component, operation, input, config) =
1299                infer_component_exec(payload, component_ref);
1300            let mut payload_obj = serde_json::Map::new();
1301            // component.exec is meta; ensure the payload carries the actual target component.
1302            payload_obj.insert("component".into(), Value::String(target_component));
1303            payload_obj.insert("operation".into(), Value::String(operation));
1304            payload_obj.insert("input".into(), input);
1305            if let Some(cfg) = config {
1306                payload_obj.insert("config".into(), cfg);
1307            }
1308            node.component = "component.exec".to_string();
1309            node.payload = Value::Object(payload_obj);
1310        }
1311    }
1312    doc
1313}
1314
1315fn infer_component_exec(
1316    payload: &Value,
1317    component_ref: &str,
1318) -> (String, String, Value, Option<Value>) {
1319    let default_op = if component_ref.starts_with("templating.") {
1320        "render"
1321    } else {
1322        "invoke"
1323    }
1324    .to_string();
1325
1326    if let Value::Object(map) = payload {
1327        let op = map
1328            .get("op")
1329            .or_else(|| map.get("operation"))
1330            .and_then(Value::as_str)
1331            .map(|s| s.to_string())
1332            .unwrap_or_else(|| default_op.clone());
1333
1334        let mut input = map.clone();
1335        let config = input.remove("config");
1336        let component = input
1337            .get("component")
1338            .or_else(|| input.get("component_ref"))
1339            .and_then(Value::as_str)
1340            .map(|s| s.to_string())
1341            .unwrap_or_else(|| component_ref.to_string());
1342        input.remove("component");
1343        input.remove("component_ref");
1344        input.remove("op");
1345        input.remove("operation");
1346        return (component, op, Value::Object(input), config);
1347    }
1348
1349    (component_ref.to_string(), default_op, payload.clone(), None)
1350}
1351
1352#[cfg(test)]
1353mod tests {
1354    use super::*;
1355    use greentic_flow::model::{FlowDoc, NodeDoc};
1356    use serde_json::json;
1357    use std::collections::BTreeMap;
1358
1359    #[test]
1360    fn normalizes_raw_component_to_component_exec() {
1361        let mut nodes = BTreeMap::new();
1362        let mut raw = BTreeMap::new();
1363        raw.insert(
1364            "templating.handlebars".into(),
1365            json!({ "template": "Hi {{name}}" }),
1366        );
1367        nodes.insert(
1368            "start".into(),
1369            NodeDoc {
1370                raw,
1371                routing: json!([{"out": true}]),
1372                ..Default::default()
1373            },
1374        );
1375        let doc = FlowDoc {
1376            id: "welcome".into(),
1377            title: None,
1378            description: None,
1379            flow_type: "messaging".into(),
1380            start: Some("start".into()),
1381            parameters: json!({}),
1382            tags: Vec::new(),
1383            entrypoints: BTreeMap::new(),
1384            nodes,
1385        };
1386
1387        let normalized = normalize_flow_doc(doc);
1388        let node = normalized.nodes.get("start").expect("node exists");
1389        assert_eq!(node.component, "component.exec");
1390        assert!(node.raw.is_empty() || node.raw.contains_key("templating.handlebars"));
1391        let payload = node.payload.as_object().expect("payload object");
1392        assert_eq!(
1393            payload.get("component"),
1394            Some(&Value::String("templating.handlebars".into()))
1395        );
1396        assert_eq!(
1397            payload.get("operation"),
1398            Some(&Value::String("render".into()))
1399        );
1400        let input = payload.get("input").unwrap();
1401        assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
1402    }
1403}
1404
1405fn load_components_from_archive(
1406    engine: &Engine,
1407    path: &Path,
1408    manifest: Option<&greentic_types::PackManifest>,
1409) -> Result<HashMap<String, PackComponent>> {
1410    let mut archive = ZipArchive::new(File::open(path)?)
1411        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1412    let mut components = HashMap::new();
1413    if let Some(manifest) = manifest {
1414        for entry in &manifest.components {
1415            let file_name = format!("components/{}.wasm", entry.id.as_str());
1416            let bytes = read_entry(&mut archive, &file_name)
1417                .with_context(|| format!("missing component {}", file_name))?;
1418            let component = Component::from_binary(engine, &bytes)
1419                .with_context(|| format!("failed to compile component {}", entry.id.as_str()))?;
1420            components.insert(
1421                entry.id.as_str().to_string(),
1422                PackComponent {
1423                    name: entry.id.as_str().to_string(),
1424                    version: entry.version.to_string(),
1425                    component,
1426                },
1427            );
1428        }
1429    }
1430    Ok(components)
1431}
1432
1433fn load_legacy_components_from_archive(
1434    engine: &Engine,
1435    path: &Path,
1436    manifest: &legacy_pack::PackManifest,
1437) -> Result<HashMap<String, PackComponent>> {
1438    let mut archive = ZipArchive::new(File::open(path)?)
1439        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1440    let mut components = HashMap::new();
1441    for entry in &manifest.components {
1442        let bytes = read_entry(&mut archive, &entry.file_wasm)
1443            .with_context(|| format!("missing component {}", entry.file_wasm))?;
1444        let component = Component::from_binary(engine, &bytes)
1445            .with_context(|| format!("failed to compile component {}", entry.name))?;
1446        components.insert(
1447            entry.name.clone(),
1448            PackComponent {
1449                name: entry.name.clone(),
1450                version: entry.version.to_string(),
1451                component,
1452            },
1453        );
1454    }
1455    Ok(components)
1456}
1457
1458#[derive(Clone, Debug, Default, Serialize, Deserialize)]
1459pub struct PackMetadata {
1460    pub pack_id: String,
1461    pub version: String,
1462    #[serde(default)]
1463    pub entry_flows: Vec<String>,
1464    #[serde(default)]
1465    pub secret_requirements: Vec<greentic_types::SecretRequirement>,
1466}
1467
1468impl PackMetadata {
1469    fn from_wasm(bytes: &[u8]) -> Option<Self> {
1470        let parser = Parser::new(0);
1471        for payload in parser.parse_all(bytes) {
1472            let payload = payload.ok()?;
1473            match payload {
1474                Payload::CustomSection(section) => {
1475                    if section.name() == "greentic.manifest"
1476                        && let Ok(meta) = Self::from_bytes(section.data())
1477                    {
1478                        return Some(meta);
1479                    }
1480                }
1481                Payload::DataSection(reader) => {
1482                    for segment in reader.into_iter().flatten() {
1483                        if let Ok(meta) = Self::from_bytes(segment.data) {
1484                            return Some(meta);
1485                        }
1486                    }
1487                }
1488                _ => {}
1489            }
1490        }
1491        None
1492    }
1493
1494    fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
1495        #[derive(Deserialize)]
1496        struct RawManifest {
1497            pack_id: String,
1498            version: String,
1499            #[serde(default)]
1500            entry_flows: Vec<String>,
1501            #[serde(default)]
1502            flows: Vec<RawFlow>,
1503            #[serde(default)]
1504            secret_requirements: Vec<greentic_types::SecretRequirement>,
1505        }
1506
1507        #[derive(Deserialize)]
1508        struct RawFlow {
1509            id: String,
1510        }
1511
1512        let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
1513        let mut entry_flows = if manifest.entry_flows.is_empty() {
1514            manifest.flows.iter().map(|f| f.id.clone()).collect()
1515        } else {
1516            manifest.entry_flows.clone()
1517        };
1518        entry_flows.retain(|id| !id.is_empty());
1519        Ok(Self {
1520            pack_id: manifest.pack_id,
1521            version: manifest.version,
1522            entry_flows,
1523            secret_requirements: manifest.secret_requirements,
1524        })
1525    }
1526
1527    pub fn fallback(path: &Path) -> Self {
1528        let pack_id = path
1529            .file_stem()
1530            .map(|s| s.to_string_lossy().into_owned())
1531            .unwrap_or_else(|| "unknown-pack".to_string());
1532        Self {
1533            pack_id,
1534            version: "0.0.0".to_string(),
1535            entry_flows: Vec::new(),
1536            secret_requirements: Vec::new(),
1537        }
1538    }
1539
1540    pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
1541        let entry_flows = manifest
1542            .flows
1543            .iter()
1544            .map(|flow| flow.id.as_str().to_string())
1545            .collect::<Vec<_>>();
1546        Self {
1547            pack_id: manifest.pack_id.as_str().to_string(),
1548            version: manifest.version.to_string(),
1549            entry_flows,
1550            secret_requirements: manifest.secret_requirements.clone(),
1551        }
1552    }
1553}