greentic_runner_host/
pack.rs

1use std::collections::HashMap;
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::component_api::component::greentic::component::control::Host as ComponentControlHost;
10use crate::component_api::{
11    ComponentPre, control, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::provider::{ProviderBinding, ProviderRegistry};
15use crate::provider_core::SchemaCorePre as ProviderComponentPre;
16use crate::provider_core_only;
17use crate::runtime_wasmtime::{Component, Engine, Linker, ResourceTable};
18use anyhow::{Context, Result, anyhow, bail};
19use greentic_interfaces_wasmtime::host_helpers::v1::{
20    self as host_v1, HostFns, add_all_v1_to_linker,
21    runner_host_http::RunnerHostHttp,
22    runner_host_kv::RunnerHostKv,
23    secrets_store::{SecretsError, SecretsStoreHost},
24    state_store::{
25        OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
26        StateStoreHost, TenantCtx as StateTenantCtx,
27    },
28    telemetry_logger::{
29        OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
30        TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
31        TenantCtx as TelemetryTenantCtx,
32    },
33};
34use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
35use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
36use greentic_pack::builder as legacy_pack;
37use greentic_types::{
38    EnvId, Flow, StateKey as StoreStateKey, TeamId, TenantCtx as TypesTenantCtx, TenantId, UserId,
39    decode_pack_manifest,
40};
41use host_v1::http_client::{
42    HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
43    Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
44    RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
45    TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
46};
47use once_cell::sync::Lazy;
48use parking_lot::{Mutex, RwLock};
49use reqwest::blocking::Client as BlockingClient;
50use runner_core::normalize_under_root;
51use serde::{Deserialize, Serialize};
52use serde_cbor;
53use serde_json::{self, Value};
54use tokio::fs;
55use wasmparser::{Parser, Payload};
56use wasmtime::StoreContextMut;
57use zip::ZipArchive;
58
59use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
60use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
61use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
62
63use crate::config::HostConfig;
64use crate::secrets::{DynSecretsManager, read_secret_blocking};
65use crate::storage::state::STATE_PREFIX;
66use crate::storage::{DynSessionStore, DynStateStore};
67use crate::verify;
68use crate::wasi::RunnerWasiPolicy;
69use tracing::warn;
70use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
71use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
72
73use greentic_flow::model::FlowDoc;
74
75#[allow(dead_code)]
76pub struct PackRuntime {
77    /// Component artifact path (wasm file).
78    path: PathBuf,
79    /// Optional archive (.gtpack) used to load flows/manifests.
80    archive_path: Option<PathBuf>,
81    config: Arc<HostConfig>,
82    engine: Engine,
83    metadata: PackMetadata,
84    manifest: Option<greentic_types::PackManifest>,
85    legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
86    mocks: Option<Arc<MockLayer>>,
87    flows: Option<PackFlows>,
88    components: HashMap<String, PackComponent>,
89    http_client: Arc<BlockingClient>,
90    pre_cache: Mutex<HashMap<String, ComponentPre<ComponentState>>>,
91    session_store: Option<DynSessionStore>,
92    state_store: Option<DynStateStore>,
93    wasi_policy: Arc<RunnerWasiPolicy>,
94    provider_registry: RwLock<Option<ProviderRegistry>>,
95    secrets: DynSecretsManager,
96    oauth_config: Option<OAuthBrokerConfig>,
97}
98
99struct PackComponent {
100    #[allow(dead_code)]
101    name: String,
102    #[allow(dead_code)]
103    version: String,
104    component: Component,
105}
106
107fn build_blocking_client() -> BlockingClient {
108    std::thread::spawn(|| {
109        BlockingClient::builder()
110            .no_proxy()
111            .build()
112            .expect("blocking client")
113    })
114    .join()
115    .expect("client build thread panicked")
116}
117
118fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
119    let (root, candidate) = if path.is_absolute() {
120        let parent = path
121            .parent()
122            .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
123        let root = parent
124            .canonicalize()
125            .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
126        let file = path
127            .file_name()
128            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
129        (root, PathBuf::from(file))
130    } else {
131        let cwd = std::env::current_dir().context("failed to resolve current directory")?;
132        let base = if let Some(parent) = path.parent() {
133            cwd.join(parent)
134        } else {
135            cwd
136        };
137        let root = base
138            .canonicalize()
139            .with_context(|| format!("failed to canonicalize {}", base.display()))?;
140        let file = path
141            .file_name()
142            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
143        (root, PathBuf::from(file))
144    };
145    let safe = normalize_under_root(&root, &candidate)?;
146    Ok((root, safe))
147}
148
149static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
150
151#[derive(Debug, Clone, Serialize, Deserialize)]
152pub struct FlowDescriptor {
153    pub id: String,
154    #[serde(rename = "type")]
155    pub flow_type: String,
156    pub profile: String,
157    pub version: String,
158    #[serde(default)]
159    pub description: Option<String>,
160}
161
162pub struct HostState {
163    config: Arc<HostConfig>,
164    http_client: Arc<BlockingClient>,
165    default_env: String,
166    #[allow(dead_code)]
167    session_store: Option<DynSessionStore>,
168    state_store: Option<DynStateStore>,
169    mocks: Option<Arc<MockLayer>>,
170    secrets: DynSecretsManager,
171    oauth_config: Option<OAuthBrokerConfig>,
172    oauth_host: OAuthBrokerHost,
173}
174
175impl HostState {
176    #[allow(clippy::default_constructed_unit_structs)]
177    pub fn new(
178        config: Arc<HostConfig>,
179        http_client: Arc<BlockingClient>,
180        mocks: Option<Arc<MockLayer>>,
181        session_store: Option<DynSessionStore>,
182        state_store: Option<DynStateStore>,
183        secrets: DynSecretsManager,
184        oauth_config: Option<OAuthBrokerConfig>,
185    ) -> Result<Self> {
186        let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
187        Ok(Self {
188            config,
189            http_client,
190            default_env,
191            session_store,
192            state_store,
193            mocks,
194            secrets,
195            oauth_config,
196            oauth_host: OAuthBrokerHost::default(),
197        })
198    }
199
200    pub fn get_secret(&self, key: &str) -> Result<String> {
201        if provider_core_only::is_enabled() {
202            bail!(provider_core_only::blocked_message("secrets"))
203        }
204        if !self.config.secrets_policy.is_allowed(key) {
205            bail!("secret {key} is not permitted by bindings policy");
206        }
207        if let Some(mock) = &self.mocks
208            && let Some(value) = mock.secrets_lookup(key)
209        {
210            return Ok(value);
211        }
212        let bytes = read_secret_blocking(&self.secrets, key)
213            .context("failed to read secret from manager")?;
214        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
215        Ok(value)
216    }
217
218    fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
219        let tenant_raw = ctx
220            .as_ref()
221            .map(|ctx| ctx.tenant.clone())
222            .unwrap_or_else(|| self.config.tenant.clone());
223        let env_raw = ctx
224            .as_ref()
225            .map(|ctx| ctx.env.clone())
226            .unwrap_or_else(|| self.default_env.clone());
227        let tenant_id = TenantId::from_str(&tenant_raw)
228            .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
229        let env_id = EnvId::from_str(&env_raw)
230            .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
231        let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
232        if let Some(ctx) = ctx {
233            if let Some(team) = ctx.team.or(ctx.team_id) {
234                let team_id =
235                    TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
236                tenant_ctx = tenant_ctx.with_team(Some(team_id));
237            }
238            if let Some(user) = ctx.user.or(ctx.user_id) {
239                let user_id =
240                    UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
241                tenant_ctx = tenant_ctx.with_user(Some(user_id));
242            }
243            if let Some(flow) = ctx.flow_id {
244                tenant_ctx = tenant_ctx.with_flow(flow);
245            }
246            if let Some(node) = ctx.node_id {
247                tenant_ctx = tenant_ctx.with_node(node);
248            }
249            if let Some(provider) = ctx.provider_id {
250                tenant_ctx = tenant_ctx.with_provider(provider);
251            }
252            if let Some(session) = ctx.session_id {
253                tenant_ctx = tenant_ctx.with_session(session);
254            }
255            tenant_ctx.trace_id = ctx.trace_id;
256        }
257        Ok(tenant_ctx)
258    }
259
260    fn send_http_request(
261        &mut self,
262        req: HttpRequest,
263        opts: Option<HttpRequestOptionsV1_1>,
264        _ctx: Option<HttpTenantCtx>,
265    ) -> Result<HttpResponse, HttpClientError> {
266        if !self.config.http_enabled {
267            return Err(HttpClientError {
268                code: "denied".into(),
269                message: "http client disabled by policy".into(),
270            });
271        }
272
273        let mut mock_state = None;
274        let raw_body = req.body.clone();
275        if let Some(mock) = &self.mocks
276            && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
277        {
278            match mock.http_begin(&meta) {
279                HttpDecision::Mock(response) => {
280                    let headers = response
281                        .headers
282                        .iter()
283                        .map(|(k, v)| (k.clone(), v.clone()))
284                        .collect();
285                    return Ok(HttpResponse {
286                        status: response.status,
287                        headers,
288                        body: response.body.clone().map(|b| b.into_bytes()),
289                    });
290                }
291                HttpDecision::Deny(reason) => {
292                    return Err(HttpClientError {
293                        code: "denied".into(),
294                        message: reason,
295                    });
296                }
297                HttpDecision::Passthrough { record } => {
298                    mock_state = Some((meta, record));
299                }
300            }
301        }
302
303        let method = req.method.parse().unwrap_or(reqwest::Method::GET);
304        let mut builder = self.http_client.request(method, &req.url);
305        for (key, value) in req.headers {
306            if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
307                && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
308            {
309                builder = builder.header(header, header_value);
310            }
311        }
312
313        if let Some(body) = raw_body.clone() {
314            builder = builder.body(body);
315        }
316
317        if let Some(opts) = opts {
318            if let Some(timeout_ms) = opts.timeout_ms {
319                builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
320            }
321            if opts.allow_insecure == Some(true) {
322                warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
323            }
324            if let Some(follow_redirects) = opts.follow_redirects
325                && !follow_redirects
326            {
327                warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
328            }
329        }
330
331        let response = match builder.send() {
332            Ok(resp) => resp,
333            Err(err) => {
334                warn!(url = %req.url, error = %err, "http client request failed");
335                return Err(HttpClientError {
336                    code: "unavailable".into(),
337                    message: err.to_string(),
338                });
339            }
340        };
341
342        let status = response.status().as_u16();
343        let headers_vec = response
344            .headers()
345            .iter()
346            .map(|(k, v)| {
347                (
348                    k.as_str().to_string(),
349                    v.to_str().unwrap_or_default().to_string(),
350                )
351            })
352            .collect::<Vec<_>>();
353        let body_bytes = response.bytes().ok().map(|b| b.to_vec());
354
355        if let Some((meta, true)) = mock_state.take()
356            && let Some(mock) = &self.mocks
357        {
358            let recorded = HttpMockResponse::new(
359                status,
360                headers_vec.clone().into_iter().collect(),
361                body_bytes
362                    .as_ref()
363                    .map(|b| String::from_utf8_lossy(b).into_owned()),
364            );
365            mock.http_record(&meta, &recorded);
366        }
367
368        Ok(HttpResponse {
369            status,
370            headers: headers_vec,
371            body: body_bytes,
372        })
373    }
374}
375
376impl SecretsStoreHost for HostState {
377    fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
378        if provider_core_only::is_enabled() {
379            warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
380            return Err(SecretsError::Denied);
381        }
382        if !self.config.secrets_policy.is_allowed(&key) {
383            return Err(SecretsError::Denied);
384        }
385        if let Some(mock) = &self.mocks
386            && let Some(value) = mock.secrets_lookup(&key)
387        {
388            return Ok(Some(value.into_bytes()));
389        }
390        match read_secret_blocking(&self.secrets, &key) {
391            Ok(bytes) => Ok(Some(bytes)),
392            Err(err) => {
393                warn!(secret = %key, error = %err, "secret lookup failed");
394                Err(SecretsError::NotFound)
395            }
396        }
397    }
398}
399
400impl HttpClientHost for HostState {
401    fn send(
402        &mut self,
403        req: HttpRequest,
404        ctx: Option<HttpTenantCtx>,
405    ) -> Result<HttpResponse, HttpClientError> {
406        self.send_http_request(req, None, ctx)
407    }
408}
409
410impl HttpClientHostV1_1 for HostState {
411    fn send(
412        &mut self,
413        req: HttpRequestV1_1,
414        opts: Option<HttpRequestOptionsV1_1>,
415        ctx: Option<HttpTenantCtxV1_1>,
416    ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
417        let legacy_req = HttpRequest {
418            method: req.method,
419            url: req.url,
420            headers: req.headers,
421            body: req.body,
422        };
423        let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
424            env: ctx.env,
425            tenant: ctx.tenant,
426            tenant_id: ctx.tenant_id,
427            team: ctx.team,
428            team_id: ctx.team_id,
429            user: ctx.user,
430            user_id: ctx.user_id,
431            trace_id: ctx.trace_id,
432            correlation_id: ctx.correlation_id,
433            attributes: ctx.attributes,
434            session_id: ctx.session_id,
435            flow_id: ctx.flow_id,
436            node_id: ctx.node_id,
437            provider_id: ctx.provider_id,
438            deadline_ms: ctx.deadline_ms,
439            attempt: ctx.attempt,
440            idempotency_key: ctx.idempotency_key,
441            impersonation: ctx
442                .impersonation
443                .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
444                    actor_id,
445                    reason,
446                }),
447        });
448
449        self.send_http_request(legacy_req, opts, legacy_ctx)
450            .map(|resp| HttpResponseV1_1 {
451                status: resp.status,
452                headers: resp.headers,
453                body: resp.body,
454            })
455            .map_err(|err| HttpClientErrorV1_1 {
456                code: err.code,
457                message: err.message,
458            })
459    }
460}
461
462impl StateStoreHost for HostState {
463    fn read(
464        &mut self,
465        key: HostStateKey,
466        ctx: Option<StateTenantCtx>,
467    ) -> Result<Vec<u8>, StateError> {
468        let store = match self.state_store.as_ref() {
469            Some(store) => store.clone(),
470            None => {
471                return Err(StateError {
472                    code: "unavailable".into(),
473                    message: "state store not configured".into(),
474                });
475            }
476        };
477        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
478            Ok(ctx) => ctx,
479            Err(err) => {
480                return Err(StateError {
481                    code: "invalid-ctx".into(),
482                    message: err.to_string(),
483                });
484            }
485        };
486        let key = StoreStateKey::from(key);
487        match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
488            Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
489            Ok(None) => Err(StateError {
490                code: "not_found".into(),
491                message: "state key not found".into(),
492            }),
493            Err(err) => Err(StateError {
494                code: "internal".into(),
495                message: err.to_string(),
496            }),
497        }
498    }
499
500    fn write(
501        &mut self,
502        key: HostStateKey,
503        bytes: Vec<u8>,
504        ctx: Option<StateTenantCtx>,
505    ) -> Result<StateOpAck, StateError> {
506        let store = match self.state_store.as_ref() {
507            Some(store) => store.clone(),
508            None => {
509                return Err(StateError {
510                    code: "unavailable".into(),
511                    message: "state store not configured".into(),
512                });
513            }
514        };
515        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
516            Ok(ctx) => ctx,
517            Err(err) => {
518                return Err(StateError {
519                    code: "invalid-ctx".into(),
520                    message: err.to_string(),
521                });
522            }
523        };
524        let key = StoreStateKey::from(key);
525        let value = serde_json::from_slice(&bytes)
526            .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
527        match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
528            Ok(()) => Ok(StateOpAck::Ok),
529            Err(err) => Err(StateError {
530                code: "internal".into(),
531                message: err.to_string(),
532            }),
533        }
534    }
535
536    fn delete(
537        &mut self,
538        key: HostStateKey,
539        ctx: Option<StateTenantCtx>,
540    ) -> Result<StateOpAck, StateError> {
541        let store = match self.state_store.as_ref() {
542            Some(store) => store.clone(),
543            None => {
544                return Err(StateError {
545                    code: "unavailable".into(),
546                    message: "state store not configured".into(),
547                });
548            }
549        };
550        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
551            Ok(ctx) => ctx,
552            Err(err) => {
553                return Err(StateError {
554                    code: "invalid-ctx".into(),
555                    message: err.to_string(),
556                });
557            }
558        };
559        let key = StoreStateKey::from(key);
560        match store.del(&tenant_ctx, STATE_PREFIX, &key) {
561            Ok(_) => Ok(StateOpAck::Ok),
562            Err(err) => Err(StateError {
563                code: "internal".into(),
564                message: err.to_string(),
565            }),
566        }
567    }
568}
569
570impl TelemetryLoggerHost for HostState {
571    fn log(
572        &mut self,
573        span: TelemetrySpanContext,
574        fields: Vec<(String, String)>,
575        _ctx: Option<TelemetryTenantCtx>,
576    ) -> Result<TelemetryAck, TelemetryError> {
577        if let Some(mock) = &self.mocks
578            && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
579        {
580            return Ok(TelemetryAck::Ok);
581        }
582        let mut map = serde_json::Map::new();
583        for (k, v) in fields {
584            map.insert(k, Value::String(v));
585        }
586        tracing::info!(
587            tenant = %span.tenant,
588            flow_id = %span.flow_id,
589            node = ?span.node_id,
590            provider = %span.provider,
591            fields = %serde_json::Value::Object(map.clone()),
592            "telemetry log from pack"
593        );
594        Ok(TelemetryAck::Ok)
595    }
596}
597
598impl RunnerHostHttp for HostState {
599    fn request(
600        &mut self,
601        method: String,
602        url: String,
603        headers: Vec<String>,
604        body: Option<Vec<u8>>,
605    ) -> Result<Vec<u8>, String> {
606        let req = HttpRequest {
607            method,
608            url,
609            headers: headers
610                .chunks(2)
611                .filter_map(|chunk| {
612                    if chunk.len() == 2 {
613                        Some((chunk[0].clone(), chunk[1].clone()))
614                    } else {
615                        None
616                    }
617                })
618                .collect(),
619            body,
620        };
621        match HttpClientHost::send(self, req, None) {
622            Ok(resp) => Ok(resp.body.unwrap_or_default()),
623            Err(err) => Err(err.message),
624        }
625    }
626}
627
628impl RunnerHostKv for HostState {
629    fn get(&mut self, _ns: String, _key: String) -> Option<String> {
630        None
631    }
632
633    fn put(&mut self, _ns: String, _key: String, _val: String) {}
634}
635
636enum ManifestLoad {
637    New {
638        manifest: Box<greentic_types::PackManifest>,
639        flows: PackFlows,
640    },
641    Legacy {
642        manifest: Box<legacy_pack::PackManifest>,
643        flows: PackFlows,
644    },
645}
646
647fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
648    let mut archive = ZipArchive::new(File::open(path)?)
649        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
650    let bytes = read_entry(&mut archive, "manifest.cbor")
651        .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
652    match decode_pack_manifest(&bytes) {
653        Ok(manifest) => {
654            let cache = PackFlows::from_manifest(manifest.clone());
655            Ok(ManifestLoad::New {
656                manifest: Box::new(manifest),
657                flows: cache,
658            })
659        }
660        Err(err) => {
661            tracing::debug!(error = %err, pack = %path.display(), "decode_pack_manifest failed; trying legacy manifest");
662            // Fall back to legacy pack manifest
663            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
664                .context("failed to decode legacy pack manifest from manifest.cbor")?;
665            let flows = load_legacy_flows(&mut archive, &legacy)?;
666            Ok(ManifestLoad::Legacy {
667                manifest: Box::new(legacy),
668                flows,
669            })
670        }
671    }
672}
673
674fn load_legacy_flows(
675    archive: &mut ZipArchive<File>,
676    manifest: &legacy_pack::PackManifest,
677) -> Result<PackFlows> {
678    let mut flows = HashMap::new();
679    let mut descriptors = Vec::new();
680
681    for entry in &manifest.flows {
682        let bytes = read_entry(archive, &entry.file_json)
683            .with_context(|| format!("missing flow json {}", entry.file_json))?;
684        let doc: FlowDoc = serde_json::from_slice(&bytes)
685            .with_context(|| format!("failed to decode flow doc {}", entry.file_json))?;
686        let normalized = normalize_flow_doc(doc);
687        let flow_ir = flow_doc_to_ir(normalized)?;
688        let flow = flow_ir_to_flow(flow_ir)?;
689
690        descriptors.push(FlowDescriptor {
691            id: entry.id.clone(),
692            flow_type: entry.kind.clone(),
693            profile: manifest.meta.pack_id.clone(),
694            version: manifest.meta.version.to_string(),
695            description: None,
696        });
697        flows.insert(entry.id.clone(), flow);
698    }
699
700    let mut entry_flows = manifest.meta.entry_flows.clone();
701    if entry_flows.is_empty() {
702        entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
703    }
704    let metadata = PackMetadata {
705        pack_id: manifest.meta.pack_id.clone(),
706        version: manifest.meta.version.to_string(),
707        entry_flows,
708        secret_requirements: Vec::new(),
709    };
710
711    Ok(PackFlows {
712        descriptors,
713        flows,
714        metadata,
715    })
716}
717
718pub struct ComponentState {
719    pub host: HostState,
720    wasi_ctx: WasiCtx,
721    resource_table: ResourceTable,
722}
723
724impl ComponentState {
725    pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
726        let wasi_ctx = policy
727            .instantiate()
728            .context("failed to build WASI context")?;
729        Ok(Self {
730            host,
731            wasi_ctx,
732            resource_table: ResourceTable::new(),
733        })
734    }
735
736    fn host_mut(&mut self) -> &mut HostState {
737        &mut self.host
738    }
739}
740
741impl control::Host for ComponentState {
742    fn should_cancel(&mut self) -> bool {
743        false
744    }
745
746    fn yield_now(&mut self) {
747        // no-op cooperative yield
748    }
749}
750
751fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
752    let mut inst = linker.instance("greentic:component/control@0.4.0")?;
753    inst.func_wrap(
754        "should-cancel",
755        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
756            let host = caller.data_mut();
757            Ok((ComponentControlHost::should_cancel(host),))
758        },
759    )?;
760    inst.func_wrap(
761        "yield-now",
762        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
763            let host = caller.data_mut();
764            ComponentControlHost::yield_now(host);
765            Ok(())
766        },
767    )?;
768    Ok(())
769}
770
771pub fn register_all(linker: &mut Linker<ComponentState>) -> Result<()> {
772    add_wasi_to_linker(linker)?;
773    add_all_v1_to_linker(
774        linker,
775        HostFns {
776            http_client_v1_1: Some(|state| state.host_mut()),
777            http_client: Some(|state| state.host_mut()),
778            oauth_broker: None,
779            runner_host_http: Some(|state| state.host_mut()),
780            runner_host_kv: Some(|state| state.host_mut()),
781            telemetry_logger: Some(|state| state.host_mut()),
782            state_store: Some(|state| state.host_mut()),
783            secrets_store: Some(|state| state.host_mut()),
784        },
785    )?;
786    Ok(())
787}
788
789impl OAuthHostContext for ComponentState {
790    fn tenant_id(&self) -> &str {
791        &self.host.config.tenant
792    }
793
794    fn env(&self) -> &str {
795        &self.host.default_env
796    }
797
798    fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
799        &mut self.host.oauth_host
800    }
801
802    fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
803        self.host.oauth_config.as_ref()
804    }
805}
806
807impl WasiView for ComponentState {
808    fn ctx(&mut self) -> WasiCtxView<'_> {
809        WasiCtxView {
810            ctx: &mut self.wasi_ctx,
811            table: &mut self.resource_table,
812        }
813    }
814}
815
816#[allow(unsafe_code)]
817unsafe impl Send for ComponentState {}
818#[allow(unsafe_code)]
819unsafe impl Sync for ComponentState {}
820
821impl PackRuntime {
822    #[allow(clippy::too_many_arguments)]
823    pub async fn load(
824        path: impl AsRef<Path>,
825        config: Arc<HostConfig>,
826        mocks: Option<Arc<MockLayer>>,
827        archive_source: Option<&Path>,
828        session_store: Option<DynSessionStore>,
829        state_store: Option<DynStateStore>,
830        wasi_policy: Arc<RunnerWasiPolicy>,
831        secrets: DynSecretsManager,
832        oauth_config: Option<OAuthBrokerConfig>,
833        verify_archive: bool,
834    ) -> Result<Self> {
835        let path = path.as_ref();
836        let (_pack_root, safe_path) = normalize_pack_path(path)?;
837        let is_component = safe_path
838            .extension()
839            .and_then(|ext| ext.to_str())
840            .map(|ext| ext.eq_ignore_ascii_case("wasm"))
841            .unwrap_or(false);
842        let archive_hint_path = if let Some(source) = archive_source {
843            let (_, normalized) = normalize_pack_path(source)?;
844            Some(normalized)
845        } else if is_component {
846            None
847        } else {
848            Some(safe_path.clone())
849        };
850        let archive_hint = archive_hint_path.as_deref();
851        if verify_archive {
852            let verify_target = archive_hint.unwrap_or(&safe_path);
853            verify::verify_pack(verify_target).await?;
854            tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
855        }
856        let engine = Engine::default();
857        let wasm_bytes = fs::read(&safe_path).await?;
858        let mut metadata = PackMetadata::from_wasm(&wasm_bytes)
859            .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
860        let mut manifest = None;
861        let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
862        let flows = if let Some(archive_path) = archive_hint {
863            match load_manifest_and_flows(archive_path) {
864                Ok(ManifestLoad::New {
865                    manifest: m,
866                    flows: cache,
867                }) => {
868                    metadata = cache.metadata.clone();
869                    manifest = Some(*m);
870                    Some(cache)
871                }
872                Ok(ManifestLoad::Legacy {
873                    manifest: m,
874                    flows: cache,
875                }) => {
876                    metadata = cache.metadata.clone();
877                    legacy_manifest = Some(m);
878                    Some(cache)
879                }
880                Err(err) => {
881                    warn!(error = %err, pack = %archive_path.display(), "failed to parse pack manifest; skipping flows");
882                    None
883                }
884            }
885        } else {
886            None
887        };
888        let components = if let Some(archive_path) = archive_hint {
889            if let Some(new_manifest) = manifest.as_ref() {
890                match load_components_from_archive(&engine, archive_path, Some(new_manifest)) {
891                    Ok(map) => map,
892                    Err(err) => {
893                        warn!(error = %err, pack = %archive_path.display(), "failed to load components from archive");
894                        HashMap::new()
895                    }
896                }
897            } else if let Some(legacy) = legacy_manifest.as_ref() {
898                match load_legacy_components_from_archive(&engine, archive_path, legacy) {
899                    Ok(map) => map,
900                    Err(err) => {
901                        warn!(error = %err, pack = %archive_path.display(), "failed to load components from archive");
902                        HashMap::new()
903                    }
904                }
905            } else {
906                HashMap::new()
907            }
908        } else if is_component {
909            let name = safe_path
910                .file_stem()
911                .map(|s| s.to_string_lossy().to_string())
912                .unwrap_or_else(|| "component".to_string());
913            let component = Component::from_binary(&engine, &wasm_bytes)?;
914            let mut map = HashMap::new();
915            map.insert(
916                name.clone(),
917                PackComponent {
918                    name,
919                    version: metadata.version.clone(),
920                    component,
921                },
922            );
923            map
924        } else {
925            HashMap::new()
926        };
927        let http_client = Arc::clone(&HTTP_CLIENT);
928        Ok(Self {
929            path: safe_path,
930            archive_path: archive_hint.map(Path::to_path_buf),
931            config,
932            engine,
933            metadata,
934            manifest,
935            legacy_manifest,
936            mocks,
937            flows,
938            components,
939            http_client,
940            pre_cache: Mutex::new(HashMap::new()),
941            session_store,
942            state_store,
943            wasi_policy,
944            provider_registry: RwLock::new(None),
945            secrets,
946            oauth_config,
947        })
948    }
949
950    pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
951        if let Some(cache) = &self.flows {
952            return Ok(cache.descriptors.clone());
953        }
954        if let Some(manifest) = &self.manifest {
955            let descriptors = manifest
956                .flows
957                .iter()
958                .map(|flow| FlowDescriptor {
959                    id: flow.id.as_str().to_string(),
960                    flow_type: flow_kind_to_str(flow.kind).to_string(),
961                    profile: manifest.pack_id.as_str().to_string(),
962                    version: manifest.version.to_string(),
963                    description: None,
964                })
965                .collect();
966            return Ok(descriptors);
967        }
968        Ok(Vec::new())
969    }
970
971    #[allow(dead_code)]
972    pub async fn run_flow(
973        &self,
974        flow_id: &str,
975        input: serde_json::Value,
976    ) -> Result<serde_json::Value> {
977        let pack = Arc::new(
978            PackRuntime::load(
979                &self.path,
980                Arc::clone(&self.config),
981                self.mocks.clone(),
982                self.archive_path.as_deref(),
983                self.session_store.clone(),
984                self.state_store.clone(),
985                Arc::clone(&self.wasi_policy),
986                self.secrets.clone(),
987                self.oauth_config.clone(),
988                false,
989            )
990            .await?,
991        );
992
993        let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
994        let retry_config = self.config.retry_config().into();
995        let mocks = pack.mocks.as_deref();
996        let tenant = self.config.tenant.as_str();
997
998        let ctx = FlowContext {
999            tenant,
1000            flow_id,
1001            node_id: None,
1002            tool: None,
1003            action: None,
1004            session_id: None,
1005            provider_id: None,
1006            retry_config,
1007            observer: None,
1008            mocks,
1009        };
1010
1011        let execution = engine.execute(ctx, input).await?;
1012        match execution.status {
1013            FlowStatus::Completed => Ok(execution.output),
1014            FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1015                "status": "pending",
1016                "reason": wait.reason,
1017                "resume": wait.snapshot,
1018                "response": execution.output,
1019            })),
1020        }
1021    }
1022
1023    pub async fn invoke_component(
1024        &self,
1025        component_ref: &str,
1026        ctx: ComponentExecCtx,
1027        operation: &str,
1028        _config_json: Option<String>,
1029        input_json: String,
1030    ) -> Result<Value> {
1031        let pack_component = self
1032            .components
1033            .get(component_ref)
1034            .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1035
1036        let mut linker = Linker::new(&self.engine);
1037        register_all(&mut linker)?;
1038        add_component_control_to_linker(&mut linker)?;
1039        let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1040        let pre: ComponentPre<ComponentState> = ComponentPre::new(pre_instance)?;
1041
1042        let host_state = HostState::new(
1043            Arc::clone(&self.config),
1044            Arc::clone(&self.http_client),
1045            self.mocks.clone(),
1046            self.session_store.clone(),
1047            self.state_store.clone(),
1048            Arc::clone(&self.secrets),
1049            self.oauth_config.clone(),
1050        )?;
1051        let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1052        let mut store = wasmtime::Store::new(&self.engine, store_state);
1053        let bindings: crate::component_api::Component = pre.instantiate_async(&mut store).await?;
1054        let node = bindings.greentic_component_node();
1055
1056        let result = node.call_invoke(&mut store, &ctx, operation, &input_json)?;
1057
1058        match result {
1059            InvokeResult::Ok(body) => {
1060                if body.is_empty() {
1061                    return Ok(Value::Null);
1062                }
1063                serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
1064            }
1065            InvokeResult::Err(NodeError {
1066                code,
1067                message,
1068                retryable,
1069                backoff_ms,
1070                details,
1071            }) => {
1072                let mut obj = serde_json::Map::new();
1073                obj.insert("ok".into(), Value::Bool(false));
1074                let mut error = serde_json::Map::new();
1075                error.insert("code".into(), Value::String(code));
1076                error.insert("message".into(), Value::String(message));
1077                error.insert("retryable".into(), Value::Bool(retryable));
1078                if let Some(backoff) = backoff_ms {
1079                    error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1080                }
1081                if let Some(details) = details {
1082                    error.insert(
1083                        "details".into(),
1084                        serde_json::from_str(&details).unwrap_or(Value::String(details)),
1085                    );
1086                }
1087                obj.insert("error".into(), Value::Object(error));
1088                Ok(Value::Object(obj))
1089            }
1090        }
1091    }
1092
1093    pub fn resolve_provider(
1094        &self,
1095        provider_id: Option<&str>,
1096        provider_type: Option<&str>,
1097    ) -> Result<ProviderBinding> {
1098        let registry = self.provider_registry()?;
1099        registry.resolve(provider_id, provider_type)
1100    }
1101
1102    pub async fn invoke_provider(
1103        &self,
1104        binding: &ProviderBinding,
1105        _ctx: ComponentExecCtx,
1106        op: &str,
1107        input_json: Vec<u8>,
1108    ) -> Result<Value> {
1109        let component_ref = &binding.component_ref;
1110        let pack_component = self
1111            .components
1112            .get(component_ref)
1113            .with_context(|| format!("provider component '{component_ref}' not found in pack"))?;
1114
1115        let mut linker = Linker::new(&self.engine);
1116        register_all(&mut linker)?;
1117        add_component_control_to_linker(&mut linker)?;
1118        let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1119        let pre: ProviderComponentPre<ComponentState> = ProviderComponentPre::new(pre_instance)?;
1120
1121        let host_state = HostState::new(
1122            Arc::clone(&self.config),
1123            Arc::clone(&self.http_client),
1124            self.mocks.clone(),
1125            self.session_store.clone(),
1126            self.state_store.clone(),
1127            Arc::clone(&self.secrets),
1128            self.oauth_config.clone(),
1129        )?;
1130        let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1131        let mut store = wasmtime::Store::new(&self.engine, store_state);
1132        let bindings: crate::provider_core::SchemaCore = pre.instantiate_async(&mut store).await?;
1133        let provider = bindings.greentic_provider_core_schema_core_api();
1134
1135        let result = provider.call_invoke(&mut store, op, &input_json)?;
1136        deserialize_json_bytes(result)
1137    }
1138
1139    fn provider_registry(&self) -> Result<ProviderRegistry> {
1140        if let Some(registry) = self.provider_registry.read().clone() {
1141            return Ok(registry);
1142        }
1143        let manifest = self
1144            .manifest
1145            .as_ref()
1146            .context("pack manifest required for provider resolution")?;
1147        let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1148        let registry = ProviderRegistry::new(
1149            manifest,
1150            self.state_store.clone(),
1151            &self.config.tenant,
1152            &env,
1153        )?;
1154        *self.provider_registry.write() = Some(registry.clone());
1155        Ok(registry)
1156    }
1157
1158    pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1159        if let Some(cache) = &self.flows {
1160            return cache
1161                .flows
1162                .get(flow_id)
1163                .cloned()
1164                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1165        }
1166        if let Some(manifest) = &self.manifest {
1167            let entry = manifest
1168                .flows
1169                .iter()
1170                .find(|f| f.id.as_str() == flow_id)
1171                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1172            return Ok(entry.flow.clone());
1173        }
1174        bail!("flow '{flow_id}' not available (pack exports disabled)")
1175    }
1176
1177    pub fn metadata(&self) -> &PackMetadata {
1178        &self.metadata
1179    }
1180
1181    pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1182        &self.metadata.secret_requirements
1183    }
1184
1185    pub fn missing_secrets(
1186        &self,
1187        tenant_ctx: &TypesTenantCtx,
1188    ) -> Vec<greentic_types::SecretRequirement> {
1189        let env = tenant_ctx.env.as_str().to_string();
1190        let tenant = tenant_ctx.tenant.as_str().to_string();
1191        let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1192        self.required_secrets()
1193            .iter()
1194            .filter(|req| {
1195                // scope must match current context if provided
1196                if let Some(scope) = &req.scope {
1197                    if scope.env != env {
1198                        return false;
1199                    }
1200                    if scope.tenant != tenant {
1201                        return false;
1202                    }
1203                    if let Some(ref team_req) = scope.team
1204                        && team.as_ref() != Some(team_req)
1205                    {
1206                        return false;
1207                    }
1208                }
1209                read_secret_blocking(&self.secrets, req.key.as_str()).is_err()
1210            })
1211            .cloned()
1212            .collect()
1213    }
1214
1215    pub fn for_component_test(
1216        components: Vec<(String, PathBuf)>,
1217        flows: HashMap<String, FlowIR>,
1218        config: Arc<HostConfig>,
1219    ) -> Result<Self> {
1220        let engine = Engine::default();
1221        let mut component_map = HashMap::new();
1222        for (name, path) in components {
1223            if !path.exists() {
1224                bail!("component artifact missing: {}", path.display());
1225            }
1226            let wasm_bytes = std::fs::read(&path)?;
1227            let component = Component::from_binary(&engine, &wasm_bytes)
1228                .with_context(|| format!("failed to compile component {}", path.display()))?;
1229            component_map.insert(
1230                name.clone(),
1231                PackComponent {
1232                    name,
1233                    version: "0.0.0".into(),
1234                    component,
1235                },
1236            );
1237        }
1238
1239        let mut flow_map = HashMap::new();
1240        let mut descriptors = Vec::new();
1241        for (id, ir) in flows {
1242            let flow_type = ir.flow_type.clone();
1243            let flow = flow_ir_to_flow(ir)?;
1244            flow_map.insert(id.clone(), flow);
1245            descriptors.push(FlowDescriptor {
1246                id: id.clone(),
1247                flow_type,
1248                profile: "test".into(),
1249                version: "0.0.0".into(),
1250                description: None,
1251            });
1252        }
1253        let flows_cache = PackFlows {
1254            descriptors: descriptors.clone(),
1255            flows: flow_map,
1256            metadata: PackMetadata::fallback(Path::new("component-test")),
1257        };
1258
1259        Ok(Self {
1260            path: PathBuf::new(),
1261            archive_path: None,
1262            config,
1263            engine,
1264            metadata: PackMetadata::fallback(Path::new("component-test")),
1265            manifest: None,
1266            legacy_manifest: None,
1267            mocks: None,
1268            flows: Some(flows_cache),
1269            components: component_map,
1270            http_client: Arc::clone(&HTTP_CLIENT),
1271            pre_cache: Mutex::new(HashMap::new()),
1272            session_store: None,
1273            state_store: None,
1274            wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1275            provider_registry: RwLock::new(None),
1276            secrets: crate::secrets::default_manager(),
1277            oauth_config: None,
1278        })
1279    }
1280}
1281
1282struct PackFlows {
1283    descriptors: Vec<FlowDescriptor>,
1284    flows: HashMap<String, Flow>,
1285    metadata: PackMetadata,
1286}
1287
1288fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
1289    if bytes.is_empty() {
1290        return Ok(Value::Null);
1291    }
1292    serde_json::from_slice(&bytes).or_else(|_| {
1293        String::from_utf8(bytes)
1294            .map(Value::String)
1295            .map_err(|err| anyhow!(err))
1296    })
1297}
1298
1299impl PackFlows {
1300    fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
1301        let descriptors = manifest
1302            .flows
1303            .iter()
1304            .map(|entry| FlowDescriptor {
1305                id: entry.id.as_str().to_string(),
1306                flow_type: flow_kind_to_str(entry.kind).to_string(),
1307                profile: manifest.pack_id.as_str().to_string(),
1308                version: manifest.version.to_string(),
1309                description: None,
1310            })
1311            .collect();
1312        let mut flows = HashMap::new();
1313        for entry in &manifest.flows {
1314            flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
1315        }
1316        Self {
1317            metadata: PackMetadata::from_manifest(&manifest),
1318            descriptors,
1319            flows,
1320        }
1321    }
1322}
1323
1324fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
1325    match kind {
1326        greentic_types::FlowKind::Messaging => "messaging",
1327        greentic_types::FlowKind::Event => "event",
1328        greentic_types::FlowKind::ComponentConfig => "component-config",
1329        greentic_types::FlowKind::Job => "job",
1330        greentic_types::FlowKind::Http => "http",
1331    }
1332}
1333
1334fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1335    let mut file = archive
1336        .by_name(name)
1337        .with_context(|| format!("entry {name} missing from archive"))?;
1338    let mut buf = Vec::new();
1339    file.read_to_end(&mut buf)?;
1340    Ok(buf)
1341}
1342
1343fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
1344    for node in doc.nodes.values_mut() {
1345        if node.component.is_empty()
1346            && let Some((component_ref, payload)) = node.raw.iter().next()
1347        {
1348            if component_ref.starts_with("emit.") {
1349                node.component = component_ref.clone();
1350                node.payload = payload.clone();
1351                node.raw.clear();
1352                continue;
1353            }
1354            let (target_component, operation, input, config) =
1355                infer_component_exec(payload, component_ref);
1356            let mut payload_obj = serde_json::Map::new();
1357            // component.exec is meta; ensure the payload carries the actual target component.
1358            payload_obj.insert("component".into(), Value::String(target_component));
1359            payload_obj.insert("operation".into(), Value::String(operation));
1360            payload_obj.insert("input".into(), input);
1361            if let Some(cfg) = config {
1362                payload_obj.insert("config".into(), cfg);
1363            }
1364            node.component = "component.exec".to_string();
1365            node.payload = Value::Object(payload_obj);
1366        }
1367    }
1368    doc
1369}
1370
1371fn infer_component_exec(
1372    payload: &Value,
1373    component_ref: &str,
1374) -> (String, String, Value, Option<Value>) {
1375    let default_op = if component_ref.starts_with("templating.") {
1376        "render"
1377    } else {
1378        "invoke"
1379    }
1380    .to_string();
1381
1382    if let Value::Object(map) = payload {
1383        let op = map
1384            .get("op")
1385            .or_else(|| map.get("operation"))
1386            .and_then(Value::as_str)
1387            .map(|s| s.to_string())
1388            .unwrap_or_else(|| default_op.clone());
1389
1390        let mut input = map.clone();
1391        let config = input.remove("config");
1392        let component = input
1393            .get("component")
1394            .or_else(|| input.get("component_ref"))
1395            .and_then(Value::as_str)
1396            .map(|s| s.to_string())
1397            .unwrap_or_else(|| component_ref.to_string());
1398        input.remove("component");
1399        input.remove("component_ref");
1400        input.remove("op");
1401        input.remove("operation");
1402        return (component, op, Value::Object(input), config);
1403    }
1404
1405    (component_ref.to_string(), default_op, payload.clone(), None)
1406}
1407
1408#[cfg(test)]
1409mod tests {
1410    use super::*;
1411    use greentic_flow::model::{FlowDoc, NodeDoc};
1412    use serde_json::json;
1413    use std::collections::BTreeMap;
1414
1415    #[test]
1416    fn normalizes_raw_component_to_component_exec() {
1417        let mut nodes = BTreeMap::new();
1418        let mut raw = BTreeMap::new();
1419        raw.insert(
1420            "templating.handlebars".into(),
1421            json!({ "template": "Hi {{name}}" }),
1422        );
1423        nodes.insert(
1424            "start".into(),
1425            NodeDoc {
1426                raw,
1427                routing: json!([{"out": true}]),
1428                ..Default::default()
1429            },
1430        );
1431        let doc = FlowDoc {
1432            id: "welcome".into(),
1433            title: None,
1434            description: None,
1435            flow_type: "messaging".into(),
1436            start: Some("start".into()),
1437            parameters: json!({}),
1438            tags: Vec::new(),
1439            entrypoints: BTreeMap::new(),
1440            nodes,
1441        };
1442
1443        let normalized = normalize_flow_doc(doc);
1444        let node = normalized.nodes.get("start").expect("node exists");
1445        assert_eq!(node.component, "component.exec");
1446        assert!(node.raw.is_empty() || node.raw.contains_key("templating.handlebars"));
1447        let payload = node.payload.as_object().expect("payload object");
1448        assert_eq!(
1449            payload.get("component"),
1450            Some(&Value::String("templating.handlebars".into()))
1451        );
1452        assert_eq!(
1453            payload.get("operation"),
1454            Some(&Value::String("render".into()))
1455        );
1456        let input = payload.get("input").unwrap();
1457        assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
1458    }
1459}
1460
1461fn load_components_from_archive(
1462    engine: &Engine,
1463    path: &Path,
1464    manifest: Option<&greentic_types::PackManifest>,
1465) -> Result<HashMap<String, PackComponent>> {
1466    let mut archive = ZipArchive::new(File::open(path)?)
1467        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1468    let mut components = HashMap::new();
1469    if let Some(manifest) = manifest {
1470        for entry in &manifest.components {
1471            let file_name = format!("components/{}.wasm", entry.id.as_str());
1472            let bytes = read_entry(&mut archive, &file_name)
1473                .with_context(|| format!("missing component {}", file_name))?;
1474            let component = Component::from_binary(engine, &bytes)
1475                .with_context(|| format!("failed to compile component {}", entry.id.as_str()))?;
1476            components.insert(
1477                entry.id.as_str().to_string(),
1478                PackComponent {
1479                    name: entry.id.as_str().to_string(),
1480                    version: entry.version.to_string(),
1481                    component,
1482                },
1483            );
1484        }
1485    }
1486    Ok(components)
1487}
1488
1489fn load_legacy_components_from_archive(
1490    engine: &Engine,
1491    path: &Path,
1492    manifest: &legacy_pack::PackManifest,
1493) -> Result<HashMap<String, PackComponent>> {
1494    let mut archive = ZipArchive::new(File::open(path)?)
1495        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1496    let mut components = HashMap::new();
1497    for entry in &manifest.components {
1498        let bytes = read_entry(&mut archive, &entry.file_wasm)
1499            .with_context(|| format!("missing component {}", entry.file_wasm))?;
1500        let component = Component::from_binary(engine, &bytes)
1501            .with_context(|| format!("failed to compile component {}", entry.name))?;
1502        components.insert(
1503            entry.name.clone(),
1504            PackComponent {
1505                name: entry.name.clone(),
1506                version: entry.version.to_string(),
1507                component,
1508            },
1509        );
1510    }
1511    Ok(components)
1512}
1513
1514#[derive(Clone, Debug, Default, Serialize, Deserialize)]
1515pub struct PackMetadata {
1516    pub pack_id: String,
1517    pub version: String,
1518    #[serde(default)]
1519    pub entry_flows: Vec<String>,
1520    #[serde(default)]
1521    pub secret_requirements: Vec<greentic_types::SecretRequirement>,
1522}
1523
1524impl PackMetadata {
1525    fn from_wasm(bytes: &[u8]) -> Option<Self> {
1526        let parser = Parser::new(0);
1527        for payload in parser.parse_all(bytes) {
1528            let payload = payload.ok()?;
1529            match payload {
1530                Payload::CustomSection(section) => {
1531                    if section.name() == "greentic.manifest"
1532                        && let Ok(meta) = Self::from_bytes(section.data())
1533                    {
1534                        return Some(meta);
1535                    }
1536                }
1537                Payload::DataSection(reader) => {
1538                    for segment in reader.into_iter().flatten() {
1539                        if let Ok(meta) = Self::from_bytes(segment.data) {
1540                            return Some(meta);
1541                        }
1542                    }
1543                }
1544                _ => {}
1545            }
1546        }
1547        None
1548    }
1549
1550    fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
1551        #[derive(Deserialize)]
1552        struct RawManifest {
1553            pack_id: String,
1554            version: String,
1555            #[serde(default)]
1556            entry_flows: Vec<String>,
1557            #[serde(default)]
1558            flows: Vec<RawFlow>,
1559            #[serde(default)]
1560            secret_requirements: Vec<greentic_types::SecretRequirement>,
1561        }
1562
1563        #[derive(Deserialize)]
1564        struct RawFlow {
1565            id: String,
1566        }
1567
1568        let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
1569        let mut entry_flows = if manifest.entry_flows.is_empty() {
1570            manifest.flows.iter().map(|f| f.id.clone()).collect()
1571        } else {
1572            manifest.entry_flows.clone()
1573        };
1574        entry_flows.retain(|id| !id.is_empty());
1575        Ok(Self {
1576            pack_id: manifest.pack_id,
1577            version: manifest.version,
1578            entry_flows,
1579            secret_requirements: manifest.secret_requirements,
1580        })
1581    }
1582
1583    pub fn fallback(path: &Path) -> Self {
1584        let pack_id = path
1585            .file_stem()
1586            .map(|s| s.to_string_lossy().into_owned())
1587            .unwrap_or_else(|| "unknown-pack".to_string());
1588        Self {
1589            pack_id,
1590            version: "0.0.0".to_string(),
1591            entry_flows: Vec::new(),
1592            secret_requirements: Vec::new(),
1593        }
1594    }
1595
1596    pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
1597        let entry_flows = manifest
1598            .flows
1599            .iter()
1600            .map(|flow| flow.id.as_str().to_string())
1601            .collect::<Vec<_>>();
1602        Self {
1603            pack_id: manifest.pack_id.as_str().to_string(),
1604            version: manifest.version.to_string(),
1605            entry_flows,
1606            secret_requirements: manifest.secret_requirements.clone(),
1607        }
1608    }
1609}