greentic_runner_host/
pack.rs

1use std::collections::{BTreeMap, HashMap};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7
8use crate::component_api::component::greentic::component::control::Host as ComponentControlHost;
9use crate::component_api::{
10    ComponentPre, control, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
11};
12use crate::imports::register_all;
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::runtime_wasmtime::{Component, Engine, Linker, ResourceTable, WasmResult};
15use anyhow::{Context, Result, anyhow, bail};
16use greentic_interfaces_host::host_import::v0_2 as host_import_v0_2;
17use greentic_interfaces_host::host_import::v0_2::greentic::host_import::imports::{
18    HttpRequest as LegacyHttpRequest, HttpResponse as LegacyHttpResponse,
19    IfaceError as LegacyIfaceError, TenantCtx as LegacyTenantCtx,
20};
21use greentic_interfaces_host::host_import::v0_6::{
22    self as host_import_v0_6, iface_types, state, types,
23};
24use greentic_pack::builder as legacy_pack;
25use greentic_session::SessionKey as StoreSessionKey;
26use greentic_types::{
27    EnvId, Flow, FlowId, SessionCursor as StoreSessionCursor, SessionData,
28    StateKey as StoreStateKey, TeamId, TenantCtx as TypesTenantCtx, TenantId, UserId,
29    decode_pack_manifest,
30};
31use once_cell::sync::Lazy;
32use parking_lot::Mutex;
33use reqwest::blocking::Client as BlockingClient;
34use runner_core::normalize_under_root;
35use serde::{Deserialize, Serialize};
36use serde_cbor;
37use serde_json::{self, Value};
38use tokio::fs;
39use wasmparser::{Parser, Payload};
40use wasmtime::StoreContextMut;
41use zip::ZipArchive;
42
43use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
44use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
45
46use crate::config::HostConfig;
47use crate::secrets::{DynSecretsManager, read_secret_blocking};
48use crate::storage::state::STATE_PREFIX;
49use crate::storage::{DynSessionStore, DynStateStore};
50use crate::verify;
51use crate::wasi::RunnerWasiPolicy;
52use tracing::warn;
53use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
54
55use greentic_flow::model::FlowDoc;
56
57#[allow(dead_code)]
58pub struct PackRuntime {
59    /// Component artifact path (wasm file).
60    path: PathBuf,
61    /// Optional archive (.gtpack) used to load flows/manifests.
62    archive_path: Option<PathBuf>,
63    config: Arc<HostConfig>,
64    engine: Engine,
65    metadata: PackMetadata,
66    manifest: Option<greentic_types::PackManifest>,
67    legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
68    mocks: Option<Arc<MockLayer>>,
69    flows: Option<PackFlows>,
70    components: HashMap<String, PackComponent>,
71    http_client: Arc<BlockingClient>,
72    pre_cache: Mutex<HashMap<String, ComponentPre<ComponentState>>>,
73    session_store: Option<DynSessionStore>,
74    state_store: Option<DynStateStore>,
75    wasi_policy: Arc<RunnerWasiPolicy>,
76    secrets: DynSecretsManager,
77    oauth_config: Option<OAuthBrokerConfig>,
78}
79
80struct PackComponent {
81    #[allow(dead_code)]
82    name: String,
83    #[allow(dead_code)]
84    version: String,
85    component: Component,
86}
87
88fn build_blocking_client() -> BlockingClient {
89    std::thread::spawn(|| {
90        BlockingClient::builder()
91            .no_proxy()
92            .build()
93            .expect("blocking client")
94    })
95    .join()
96    .expect("client build thread panicked")
97}
98
99fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
100    let (root, candidate) = if path.is_absolute() {
101        let parent = path
102            .parent()
103            .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
104        let root = parent
105            .canonicalize()
106            .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
107        let file = path
108            .file_name()
109            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
110        (root, PathBuf::from(file))
111    } else {
112        let cwd = std::env::current_dir().context("failed to resolve current directory")?;
113        let base = if let Some(parent) = path.parent() {
114            cwd.join(parent)
115        } else {
116            cwd
117        };
118        let root = base
119            .canonicalize()
120            .with_context(|| format!("failed to canonicalize {}", base.display()))?;
121        let file = path
122            .file_name()
123            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
124        (root, PathBuf::from(file))
125    };
126    let safe = normalize_under_root(&root, &candidate)?;
127    Ok((root, safe))
128}
129
130static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
131
132#[derive(Debug, Clone, Serialize, Deserialize)]
133pub struct FlowDescriptor {
134    pub id: String,
135    #[serde(rename = "type")]
136    pub flow_type: String,
137    pub profile: String,
138    pub version: String,
139    #[serde(default)]
140    pub description: Option<String>,
141}
142
143pub struct HostState {
144    config: Arc<HostConfig>,
145    http_client: Arc<BlockingClient>,
146    default_env: String,
147    session_store: Option<DynSessionStore>,
148    state_store: Option<DynStateStore>,
149    mocks: Option<Arc<MockLayer>>,
150    secrets: DynSecretsManager,
151    oauth_config: Option<OAuthBrokerConfig>,
152    oauth_host: OAuthBrokerHost,
153}
154
155impl HostState {
156    #[allow(clippy::default_constructed_unit_structs)]
157    pub fn new(
158        config: Arc<HostConfig>,
159        http_client: Arc<BlockingClient>,
160        mocks: Option<Arc<MockLayer>>,
161        session_store: Option<DynSessionStore>,
162        state_store: Option<DynStateStore>,
163        secrets: DynSecretsManager,
164        oauth_config: Option<OAuthBrokerConfig>,
165    ) -> Result<Self> {
166        let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
167        Ok(Self {
168            config,
169            http_client,
170            default_env,
171            session_store,
172            state_store,
173            mocks,
174            secrets,
175            oauth_config,
176            oauth_host: OAuthBrokerHost::default(),
177        })
178    }
179
180    pub fn get_secret(&self, key: &str) -> Result<String> {
181        if !self.config.secrets_policy.is_allowed(key) {
182            bail!("secret {key} is not permitted by bindings policy");
183        }
184        if let Some(mock) = &self.mocks
185            && let Some(value) = mock.secrets_lookup(key)
186        {
187            return Ok(value);
188        }
189        let bytes = read_secret_blocking(&self.secrets, key)
190            .context("failed to read secret from manager")?;
191        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
192        Ok(value)
193    }
194
195    fn tenant_ctx_from_v6(&self, ctx: Option<types::TenantCtx>) -> Result<TypesTenantCtx> {
196        let tenant_raw = ctx
197            .as_ref()
198            .map(|ctx| ctx.tenant.clone())
199            .unwrap_or_else(|| self.config.tenant.clone());
200        let tenant_id = TenantId::from_str(&tenant_raw)
201            .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
202        let env_id = EnvId::from_str(&self.default_env)
203            .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
204        let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
205        if let Some(ctx) = ctx {
206            if let Some(team) = ctx.team {
207                let team_id =
208                    TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
209                tenant_ctx = tenant_ctx.with_team(Some(team_id));
210            }
211            if let Some(user) = ctx.user {
212                let user_id =
213                    UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
214                tenant_ctx = tenant_ctx.with_user(Some(user_id));
215            }
216            if let Some(flow) = ctx.flow_id {
217                tenant_ctx = tenant_ctx.with_flow(flow);
218            }
219            if let Some(node) = ctx.node_id {
220                tenant_ctx = tenant_ctx.with_node(node);
221            }
222            if let Some(provider) = ctx.provider_id {
223                tenant_ctx = tenant_ctx.with_provider(provider);
224            }
225            if let Some(session) = ctx.session_id {
226                tenant_ctx = tenant_ctx.with_session(session);
227            }
228            tenant_ctx.trace_id = ctx.trace_id;
229        }
230        Ok(tenant_ctx)
231    }
232
233    fn session_store_handle(&self) -> Result<DynSessionStore, types::IfaceError> {
234        self.session_store
235            .as_ref()
236            .cloned()
237            .ok_or(types::IfaceError::Unavailable)
238    }
239
240    fn state_store_handle(&self) -> Result<DynStateStore, types::IfaceError> {
241        self.state_store
242            .as_ref()
243            .cloned()
244            .ok_or(types::IfaceError::Unavailable)
245    }
246
247    fn ensure_user(ctx: &TypesTenantCtx) -> Result<UserId, types::IfaceError> {
248        ctx.user_id
249            .clone()
250            .or_else(|| ctx.user.clone())
251            .ok_or(types::IfaceError::InvalidArg)
252    }
253
254    fn ensure_flow(ctx: &TypesTenantCtx) -> Result<FlowId, types::IfaceError> {
255        let flow = ctx.flow_id().ok_or(types::IfaceError::InvalidArg)?;
256        FlowId::from_str(flow).map_err(|_| types::IfaceError::InvalidArg)
257    }
258
259    fn cursor_from_iface(cursor: iface_types::SessionCursor) -> StoreSessionCursor {
260        let mut store_cursor = StoreSessionCursor::new(cursor.node_pointer);
261        if let Some(reason) = cursor.wait_reason {
262            store_cursor = store_cursor.with_wait_reason(reason);
263        }
264        if let Some(marker) = cursor.outbox_marker {
265            store_cursor = store_cursor.with_outbox_marker(marker);
266        }
267        store_cursor
268    }
269}
270
271enum ManifestLoad {
272    New {
273        manifest: Box<greentic_types::PackManifest>,
274        flows: PackFlows,
275    },
276    Legacy {
277        manifest: Box<legacy_pack::PackManifest>,
278        flows: PackFlows,
279    },
280}
281
282fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
283    let mut archive = ZipArchive::new(File::open(path)?)
284        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
285    let bytes = read_entry(&mut archive, "manifest.cbor")
286        .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
287    match decode_pack_manifest(&bytes) {
288        Ok(manifest) => {
289            let cache = PackFlows::from_manifest(manifest.clone());
290            Ok(ManifestLoad::New {
291                manifest: Box::new(manifest),
292                flows: cache,
293            })
294        }
295        Err(err) => {
296            tracing::debug!(error = %err, pack = %path.display(), "decode_pack_manifest failed; trying legacy manifest");
297            // Fall back to legacy pack manifest
298            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
299                .context("failed to decode legacy pack manifest from manifest.cbor")?;
300            let flows = load_legacy_flows(&mut archive, &legacy)?;
301            Ok(ManifestLoad::Legacy {
302                manifest: Box::new(legacy),
303                flows,
304            })
305        }
306    }
307}
308
309fn load_legacy_flows(
310    archive: &mut ZipArchive<File>,
311    manifest: &legacy_pack::PackManifest,
312) -> Result<PackFlows> {
313    let mut flows = HashMap::new();
314    let mut descriptors = Vec::new();
315
316    for entry in &manifest.flows {
317        let bytes = read_entry(archive, &entry.file_json)
318            .with_context(|| format!("missing flow json {}", entry.file_json))?;
319        let doc: FlowDoc = serde_json::from_slice(&bytes)
320            .with_context(|| format!("failed to decode flow doc {}", entry.file_json))?;
321        let normalized = normalize_flow_doc(doc);
322        let flow_ir = flow_doc_to_ir(normalized)?;
323        let flow = flow_ir_to_flow(flow_ir)?;
324
325        descriptors.push(FlowDescriptor {
326            id: entry.id.clone(),
327            flow_type: entry.kind.clone(),
328            profile: manifest.meta.pack_id.clone(),
329            version: manifest.meta.version.to_string(),
330            description: None,
331        });
332        flows.insert(entry.id.clone(), flow);
333    }
334
335    let mut entry_flows = manifest.meta.entry_flows.clone();
336    if entry_flows.is_empty() {
337        entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
338    }
339    let metadata = PackMetadata {
340        pack_id: manifest.meta.pack_id.clone(),
341        version: manifest.meta.version.to_string(),
342        entry_flows,
343    };
344
345    Ok(PackFlows {
346        descriptors,
347        flows,
348        metadata,
349    })
350}
351
352pub struct ComponentState {
353    host: HostState,
354    wasi_ctx: WasiCtx,
355    resource_table: ResourceTable,
356}
357
358impl ComponentState {
359    pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
360        let wasi_ctx = policy
361            .instantiate()
362            .context("failed to build WASI context")?;
363        Ok(Self {
364            host,
365            wasi_ctx,
366            resource_table: ResourceTable::new(),
367        })
368    }
369
370    fn host_mut(&mut self) -> &mut HostState {
371        &mut self.host
372    }
373}
374
375impl control::Host for ComponentState {
376    fn should_cancel(&mut self) -> bool {
377        false
378    }
379
380    fn yield_now(&mut self) {
381        // no-op cooperative yield
382    }
383}
384
385fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
386    let mut inst = linker.instance("greentic:component/control@0.4.0")?;
387    inst.func_wrap(
388        "should-cancel",
389        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
390            let host = caller.data_mut();
391            Ok((ComponentControlHost::should_cancel(host),))
392        },
393    )?;
394    inst.func_wrap(
395        "yield-now",
396        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
397            let host = caller.data_mut();
398            ComponentControlHost::yield_now(host);
399            Ok(())
400        },
401    )?;
402    Ok(())
403}
404
405impl OAuthHostContext for ComponentState {
406    fn tenant_id(&self) -> &str {
407        &self.host.config.tenant
408    }
409
410    fn env(&self) -> &str {
411        &self.host.default_env
412    }
413
414    fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
415        &mut self.host.oauth_host
416    }
417
418    fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
419        self.host.oauth_config.as_ref()
420    }
421}
422
423impl WasiView for ComponentState {
424    fn ctx(&mut self) -> WasiCtxView<'_> {
425        WasiCtxView {
426            ctx: &mut self.wasi_ctx,
427            table: &mut self.resource_table,
428        }
429    }
430}
431
432#[allow(unsafe_code)]
433unsafe impl Send for ComponentState {}
434#[allow(unsafe_code)]
435unsafe impl Sync for ComponentState {}
436
437impl host_import_v0_6::HostImports for ComponentState {
438    fn secrets_get(
439        &mut self,
440        key: String,
441        ctx: Option<types::TenantCtx>,
442    ) -> WasmResult<Result<String, types::IfaceError>> {
443        host_import_v0_6::HostImports::secrets_get(self.host_mut(), key, ctx)
444    }
445
446    fn telemetry_emit(
447        &mut self,
448        span_json: String,
449        ctx: Option<types::TenantCtx>,
450    ) -> WasmResult<()> {
451        host_import_v0_6::HostImports::telemetry_emit(self.host_mut(), span_json, ctx)
452    }
453
454    fn http_fetch(
455        &mut self,
456        req: host_import_v0_6::http::HttpRequest,
457        ctx: Option<types::TenantCtx>,
458    ) -> WasmResult<Result<host_import_v0_6::http::HttpResponse, types::IfaceError>> {
459        host_import_v0_6::HostImports::http_fetch(self.host_mut(), req, ctx)
460    }
461
462    fn mcp_exec(
463        &mut self,
464        component: String,
465        action: String,
466        args_json: String,
467        ctx: Option<types::TenantCtx>,
468    ) -> WasmResult<Result<String, types::IfaceError>> {
469        host_import_v0_6::HostImports::mcp_exec(self.host_mut(), component, action, args_json, ctx)
470    }
471
472    fn state_get(
473        &mut self,
474        key: iface_types::StateKey,
475        ctx: Option<types::TenantCtx>,
476    ) -> WasmResult<Result<String, types::IfaceError>> {
477        host_import_v0_6::HostImports::state_get(self.host_mut(), key, ctx)
478    }
479
480    fn state_set(
481        &mut self,
482        key: iface_types::StateKey,
483        value_json: String,
484        ctx: Option<types::TenantCtx>,
485    ) -> WasmResult<Result<state::OpAck, types::IfaceError>> {
486        host_import_v0_6::HostImports::state_set(self.host_mut(), key, value_json, ctx)
487    }
488
489    fn session_update(
490        &mut self,
491        cursor: iface_types::SessionCursor,
492        ctx: Option<types::TenantCtx>,
493    ) -> WasmResult<Result<String, types::IfaceError>> {
494        host_import_v0_6::HostImports::session_update(self.host_mut(), cursor, ctx)
495    }
496}
497
498impl host_import_v0_2::HostImports for ComponentState {
499    fn secrets_get(
500        &mut self,
501        key: String,
502        ctx: Option<LegacyTenantCtx>,
503    ) -> WasmResult<Result<String, LegacyIfaceError>> {
504        host_import_v0_2::HostImports::secrets_get(self.host_mut(), key, ctx)
505    }
506
507    fn telemetry_emit(
508        &mut self,
509        span_json: String,
510        ctx: Option<LegacyTenantCtx>,
511    ) -> WasmResult<()> {
512        host_import_v0_2::HostImports::telemetry_emit(self.host_mut(), span_json, ctx)
513    }
514
515    fn tool_invoke(
516        &mut self,
517        tool: String,
518        action: String,
519        args_json: String,
520        ctx: Option<LegacyTenantCtx>,
521    ) -> WasmResult<Result<String, LegacyIfaceError>> {
522        host_import_v0_2::HostImports::tool_invoke(self.host_mut(), tool, action, args_json, ctx)
523    }
524
525    fn http_fetch(
526        &mut self,
527        req: host_import_v0_2::greentic::host_import::imports::HttpRequest,
528        ctx: Option<host_import_v0_2::greentic::host_import::imports::TenantCtx>,
529    ) -> WasmResult<
530        Result<
531            host_import_v0_2::greentic::host_import::imports::HttpResponse,
532            host_import_v0_2::greentic::host_import::imports::IfaceError,
533        >,
534    > {
535        host_import_v0_2::HostImports::http_fetch(self.host_mut(), req, ctx)
536    }
537}
538
539impl host_import_v0_6::HostImports for HostState {
540    fn secrets_get(
541        &mut self,
542        key: String,
543        _ctx: Option<types::TenantCtx>,
544    ) -> WasmResult<Result<String, types::IfaceError>> {
545        Ok(self.get_secret(&key).map_err(|err| {
546            tracing::warn!(secret = %key, error = %err, "secret lookup denied");
547            types::IfaceError::Denied
548        }))
549    }
550
551    fn telemetry_emit(
552        &mut self,
553        span_json: String,
554        _ctx: Option<types::TenantCtx>,
555    ) -> WasmResult<()> {
556        if let Some(mock) = &self.mocks
557            && mock.telemetry_drain(&[("span_json", span_json.as_str())])
558        {
559            return Ok(());
560        }
561        tracing::info!(span = %span_json, "telemetry emit from pack");
562        Ok(())
563    }
564
565    fn http_fetch(
566        &mut self,
567        req: host_import_v0_6::http::HttpRequest,
568        _ctx: Option<types::TenantCtx>,
569    ) -> WasmResult<Result<host_import_v0_6::http::HttpResponse, types::IfaceError>> {
570        let legacy_req = LegacyHttpRequest {
571            method: req.method,
572            url: req.url,
573            headers_json: req.headers_json,
574            body: req.body,
575        };
576        match host_import_v0_2::HostImports::http_fetch(self, legacy_req, None)? {
577            Ok(resp) => Ok(Ok(host_import_v0_6::http::HttpResponse {
578                status: resp.status,
579                headers_json: resp.headers_json,
580                body: resp.body,
581            })),
582            Err(err) => Ok(Err(map_legacy_error(err))),
583        }
584    }
585
586    fn mcp_exec(
587        &mut self,
588        component: String,
589        action: String,
590        args_json: String,
591        ctx: Option<types::TenantCtx>,
592    ) -> WasmResult<Result<String, types::IfaceError>> {
593        let _ = (component, action, args_json, ctx);
594        tracing::warn!("mcp.exec requested but MCP bridge is removed; returning unavailable");
595        Ok(Err(types::IfaceError::Unavailable))
596    }
597
598    fn state_get(
599        &mut self,
600        key: iface_types::StateKey,
601        ctx: Option<types::TenantCtx>,
602    ) -> WasmResult<Result<String, types::IfaceError>> {
603        let store = match self.state_store_handle() {
604            Ok(store) => store,
605            Err(err) => return Ok(Err(err)),
606        };
607        let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
608            Ok(ctx) => ctx,
609            Err(err) => {
610                tracing::warn!(error = %err, "invalid tenant context for state.get");
611                return Ok(Err(types::IfaceError::InvalidArg));
612            }
613        };
614        let key = StoreStateKey::from(key);
615        match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
616            Ok(Some(value)) => {
617                let result = if let Some(text) = value.as_str() {
618                    text.to_string()
619                } else {
620                    serde_json::to_string(&value).unwrap_or_else(|_| value.to_string())
621                };
622                Ok(Ok(result))
623            }
624            Ok(None) => Ok(Err(types::IfaceError::NotFound)),
625            Err(err) => {
626                tracing::warn!(error = %err, "state.get failed");
627                Ok(Err(types::IfaceError::Internal))
628            }
629        }
630    }
631
632    fn state_set(
633        &mut self,
634        key: iface_types::StateKey,
635        value_json: String,
636        ctx: Option<types::TenantCtx>,
637    ) -> WasmResult<Result<state::OpAck, types::IfaceError>> {
638        let store = match self.state_store_handle() {
639            Ok(store) => store,
640            Err(err) => return Ok(Err(err)),
641        };
642        let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
643            Ok(ctx) => ctx,
644            Err(err) => {
645                tracing::warn!(error = %err, "invalid tenant context for state.set");
646                return Ok(Err(types::IfaceError::InvalidArg));
647            }
648        };
649        let key = StoreStateKey::from(key);
650        let value = serde_json::from_str(&value_json).unwrap_or(Value::String(value_json));
651        match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
652            Ok(()) => Ok(Ok(state::OpAck::Ok)),
653            Err(err) => {
654                tracing::warn!(error = %err, "state.set failed");
655                Ok(Err(types::IfaceError::Internal))
656            }
657        }
658    }
659
660    fn session_update(
661        &mut self,
662        cursor: iface_types::SessionCursor,
663        ctx: Option<types::TenantCtx>,
664    ) -> WasmResult<Result<String, types::IfaceError>> {
665        let store = match self.session_store_handle() {
666            Ok(store) => store,
667            Err(err) => return Ok(Err(err)),
668        };
669        let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
670            Ok(ctx) => ctx,
671            Err(err) => {
672                tracing::warn!(error = %err, "invalid tenant context for session.update");
673                return Ok(Err(types::IfaceError::InvalidArg));
674            }
675        };
676        let user = match Self::ensure_user(&tenant_ctx) {
677            Ok(user) => user,
678            Err(err) => return Ok(Err(err)),
679        };
680        let flow_id = match Self::ensure_flow(&tenant_ctx) {
681            Ok(flow) => flow,
682            Err(err) => return Ok(Err(err)),
683        };
684        let cursor = Self::cursor_from_iface(cursor);
685        let payload = SessionData {
686            tenant_ctx: tenant_ctx.clone(),
687            flow_id,
688            cursor: cursor.clone(),
689            context_json: serde_json::json!({
690                "node_pointer": cursor.node_pointer,
691                "wait_reason": cursor.wait_reason,
692                "outbox_marker": cursor.outbox_marker,
693            })
694            .to_string(),
695        };
696        if let Some(existing) = tenant_ctx.session_id() {
697            let key = StoreSessionKey::from(existing.to_string());
698            if let Err(err) = store.update_session(&key, payload) {
699                tracing::error!(error = %err, "failed to update session snapshot");
700                return Ok(Err(types::IfaceError::Internal));
701            }
702            return Ok(Ok(existing.to_string()));
703        }
704        match store.find_by_user(&tenant_ctx, &user) {
705            Ok(Some((key, _))) => {
706                if let Err(err) = store.update_session(&key, payload) {
707                    tracing::error!(error = %err, "failed to update existing user session");
708                    return Ok(Err(types::IfaceError::Internal));
709                }
710                return Ok(Ok(key.to_string()));
711            }
712            Ok(None) => {}
713            Err(err) => {
714                tracing::error!(error = %err, "session lookup failed");
715                return Ok(Err(types::IfaceError::Internal));
716            }
717        }
718        let key = match store.create_session(&tenant_ctx, payload.clone()) {
719            Ok(key) => key,
720            Err(err) => {
721                tracing::error!(error = %err, "failed to create session");
722                return Ok(Err(types::IfaceError::Internal));
723            }
724        };
725        let ctx_with_session = tenant_ctx.with_session(key.to_string());
726        let updated_payload = SessionData {
727            tenant_ctx: ctx_with_session.clone(),
728            ..payload
729        };
730        if let Err(err) = store.update_session(&key, updated_payload) {
731            tracing::warn!(error = %err, "failed to stamp session id after create");
732        }
733        Ok(Ok(key.to_string()))
734    }
735}
736
737impl host_import_v0_2::HostImports for HostState {
738    fn secrets_get(
739        &mut self,
740        key: String,
741        _ctx: Option<LegacyTenantCtx>,
742    ) -> WasmResult<Result<String, LegacyIfaceError>> {
743        Ok(self.get_secret(&key).map_err(|err| {
744            tracing::warn!(secret = %key, error = %err, "secret lookup denied");
745            LegacyIfaceError::Denied
746        }))
747    }
748
749    fn telemetry_emit(
750        &mut self,
751        span_json: String,
752        _ctx: Option<LegacyTenantCtx>,
753    ) -> WasmResult<()> {
754        if let Some(mock) = &self.mocks
755            && mock.telemetry_drain(&[("span_json", span_json.as_str())])
756        {
757            return Ok(());
758        }
759        tracing::info!(span = %span_json, "telemetry emit from pack");
760        Ok(())
761    }
762
763    fn tool_invoke(
764        &mut self,
765        tool: String,
766        action: String,
767        args_json: String,
768        ctx: Option<LegacyTenantCtx>,
769    ) -> WasmResult<Result<String, LegacyIfaceError>> {
770        let _ = (tool, action, args_json, ctx);
771        tracing::warn!("tool invoke requested but MCP bridge is removed; returning unavailable");
772        Ok(Err(LegacyIfaceError::Unavailable))
773    }
774
775    fn http_fetch(
776        &mut self,
777        req: LegacyHttpRequest,
778        _ctx: Option<LegacyTenantCtx>,
779    ) -> WasmResult<Result<LegacyHttpResponse, LegacyIfaceError>> {
780        if !self.config.http_enabled {
781            tracing::warn!(url = %req.url, "http fetch denied by policy");
782            return Ok(Err(LegacyIfaceError::Denied));
783        }
784
785        let mut mock_state = None;
786        let raw_body = req.body.clone();
787        if let Some(mock) = &self.mocks
788            && let Ok(meta) = HttpMockRequest::new(
789                &req.method,
790                &req.url,
791                raw_body.as_deref().map(|body| body.as_bytes()),
792            )
793        {
794            match mock.http_begin(&meta) {
795                HttpDecision::Mock(response) => {
796                    let http = LegacyHttpResponse::from(&response);
797                    return Ok(Ok(http));
798                }
799                HttpDecision::Deny(reason) => {
800                    tracing::warn!(url = %req.url, reason = %reason, "http fetch blocked by mocks");
801                    return Ok(Err(LegacyIfaceError::Denied));
802                }
803                HttpDecision::Passthrough { record } => {
804                    mock_state = Some((meta, record));
805                }
806            }
807        }
808
809        let method = req.method.parse().unwrap_or(reqwest::Method::GET);
810        let mut builder = self.http_client.request(method, &req.url);
811
812        if let Some(headers_json) = req.headers_json.as_ref() {
813            match serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(headers_json) {
814                Ok(map) => {
815                    for (key, value) in map {
816                        if let Some(val) = value.as_str()
817                            && let Ok(header) =
818                                reqwest::header::HeaderName::from_bytes(key.as_bytes())
819                            && let Ok(header_value) = reqwest::header::HeaderValue::from_str(val)
820                        {
821                            builder = builder.header(header, header_value);
822                        }
823                    }
824                }
825                Err(err) => {
826                    tracing::warn!(error = %err, "failed to parse headers for http.fetch");
827                }
828            }
829        }
830
831        if let Some(body) = raw_body.clone() {
832            builder = builder.body(body);
833        }
834
835        let response = match builder.send() {
836            Ok(resp) => resp,
837            Err(err) => {
838                tracing::error!(url = %req.url, error = %err, "http fetch failed");
839                return Ok(Err(LegacyIfaceError::Unavailable));
840            }
841        };
842
843        let status = response.status().as_u16();
844        let headers_map = response
845            .headers()
846            .iter()
847            .map(|(k, v)| {
848                (
849                    k.as_str().to_string(),
850                    v.to_str().unwrap_or_default().to_string(),
851                )
852            })
853            .collect::<BTreeMap<_, _>>();
854        let headers_json = serde_json::to_string(&headers_map).ok();
855        let body = response.text().ok();
856
857        if let Some((meta, true)) = mock_state.take()
858            && let Some(mock) = &self.mocks
859        {
860            let recorded = HttpMockResponse::new(status, headers_map.clone(), body.clone());
861            mock.http_record(&meta, &recorded);
862        }
863
864        Ok(Ok(LegacyHttpResponse {
865            status,
866            headers_json,
867            body,
868        }))
869    }
870}
871
872impl PackRuntime {
873    #[allow(clippy::too_many_arguments)]
874    pub async fn load(
875        path: impl AsRef<Path>,
876        config: Arc<HostConfig>,
877        mocks: Option<Arc<MockLayer>>,
878        archive_source: Option<&Path>,
879        session_store: Option<DynSessionStore>,
880        state_store: Option<DynStateStore>,
881        wasi_policy: Arc<RunnerWasiPolicy>,
882        secrets: DynSecretsManager,
883        oauth_config: Option<OAuthBrokerConfig>,
884        verify_archive: bool,
885    ) -> Result<Self> {
886        let path = path.as_ref();
887        let (_pack_root, safe_path) = normalize_pack_path(path)?;
888        let is_component = safe_path
889            .extension()
890            .and_then(|ext| ext.to_str())
891            .map(|ext| ext.eq_ignore_ascii_case("wasm"))
892            .unwrap_or(false);
893        let archive_hint_path = if let Some(source) = archive_source {
894            let (_, normalized) = normalize_pack_path(source)?;
895            Some(normalized)
896        } else if is_component {
897            None
898        } else {
899            Some(safe_path.clone())
900        };
901        let archive_hint = archive_hint_path.as_deref();
902        if verify_archive {
903            let verify_target = archive_hint.unwrap_or(&safe_path);
904            verify::verify_pack(verify_target).await?;
905            tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
906        }
907        let engine = Engine::default();
908        let wasm_bytes = fs::read(&safe_path).await?;
909        let mut metadata = PackMetadata::from_wasm(&wasm_bytes)
910            .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
911        let mut manifest = None;
912        let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
913        let flows = if let Some(archive_path) = archive_hint {
914            match load_manifest_and_flows(archive_path) {
915                Ok(ManifestLoad::New {
916                    manifest: m,
917                    flows: cache,
918                }) => {
919                    metadata = cache.metadata.clone();
920                    manifest = Some(*m);
921                    Some(cache)
922                }
923                Ok(ManifestLoad::Legacy {
924                    manifest: m,
925                    flows: cache,
926                }) => {
927                    metadata = cache.metadata.clone();
928                    legacy_manifest = Some(m);
929                    Some(cache)
930                }
931                Err(err) => {
932                    warn!(error = %err, pack = %archive_path.display(), "failed to parse pack manifest; skipping flows");
933                    None
934                }
935            }
936        } else {
937            None
938        };
939        let components = if let Some(archive_path) = archive_hint {
940            if let Some(new_manifest) = manifest.as_ref() {
941                match load_components_from_archive(&engine, archive_path, Some(new_manifest)) {
942                    Ok(map) => map,
943                    Err(err) => {
944                        warn!(error = %err, pack = %archive_path.display(), "failed to load components from archive");
945                        HashMap::new()
946                    }
947                }
948            } else if let Some(legacy) = legacy_manifest.as_ref() {
949                match load_legacy_components_from_archive(&engine, archive_path, legacy) {
950                    Ok(map) => map,
951                    Err(err) => {
952                        warn!(error = %err, pack = %archive_path.display(), "failed to load components from archive");
953                        HashMap::new()
954                    }
955                }
956            } else {
957                HashMap::new()
958            }
959        } else if is_component {
960            let name = safe_path
961                .file_stem()
962                .map(|s| s.to_string_lossy().to_string())
963                .unwrap_or_else(|| "component".to_string());
964            let component = Component::from_binary(&engine, &wasm_bytes)?;
965            let mut map = HashMap::new();
966            map.insert(
967                name.clone(),
968                PackComponent {
969                    name,
970                    version: metadata.version.clone(),
971                    component,
972                },
973            );
974            map
975        } else {
976            HashMap::new()
977        };
978        let http_client = Arc::clone(&HTTP_CLIENT);
979        Ok(Self {
980            path: safe_path,
981            archive_path: archive_hint.map(Path::to_path_buf),
982            config,
983            engine,
984            metadata,
985            manifest,
986            legacy_manifest,
987            mocks,
988            flows,
989            components,
990            http_client,
991            pre_cache: Mutex::new(HashMap::new()),
992            session_store,
993            state_store,
994            wasi_policy,
995            secrets,
996            oauth_config,
997        })
998    }
999
1000    pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
1001        if let Some(cache) = &self.flows {
1002            return Ok(cache.descriptors.clone());
1003        }
1004        if let Some(manifest) = &self.manifest {
1005            let descriptors = manifest
1006                .flows
1007                .iter()
1008                .map(|flow| FlowDescriptor {
1009                    id: flow.id.as_str().to_string(),
1010                    flow_type: flow_kind_to_str(flow.kind).to_string(),
1011                    profile: manifest.pack_id.as_str().to_string(),
1012                    version: manifest.version.to_string(),
1013                    description: None,
1014                })
1015                .collect();
1016            return Ok(descriptors);
1017        }
1018        Ok(Vec::new())
1019    }
1020
1021    #[allow(dead_code)]
1022    pub async fn run_flow(
1023        &self,
1024        _flow_id: &str,
1025        _input: serde_json::Value,
1026    ) -> Result<serde_json::Value> {
1027        // TODO: dispatch flow execution via Wasmtime
1028        Ok(serde_json::json!({}))
1029    }
1030
1031    pub async fn invoke_component(
1032        &self,
1033        component_ref: &str,
1034        ctx: ComponentExecCtx,
1035        operation: &str,
1036        _config_json: Option<String>,
1037        input_json: String,
1038    ) -> Result<Value> {
1039        let pack_component = self
1040            .components
1041            .get(component_ref)
1042            .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1043
1044        let pre = if let Some(pre) = self.pre_cache.lock().get(component_ref).cloned() {
1045            pre
1046        } else {
1047            let mut linker = Linker::new(&self.engine);
1048            register_all(&mut linker, self.oauth_config.is_some())?;
1049            add_component_control_to_linker(&mut linker)?;
1050            let pre = ComponentPre::new(
1051                linker
1052                    .instantiate_pre(&pack_component.component)
1053                    .map_err(|err| anyhow!(err))?,
1054            )
1055            .map_err(|err| anyhow!(err))?;
1056            self.pre_cache
1057                .lock()
1058                .insert(component_ref.to_string(), pre.clone());
1059            pre
1060        };
1061
1062        let host_state = HostState::new(
1063            Arc::clone(&self.config),
1064            Arc::clone(&self.http_client),
1065            self.mocks.clone(),
1066            self.session_store.clone(),
1067            self.state_store.clone(),
1068            Arc::clone(&self.secrets),
1069            self.oauth_config.clone(),
1070        )?;
1071        let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1072        let mut store = wasmtime::Store::new(&self.engine, store_state);
1073        let bindings = pre
1074            .instantiate_async(&mut store)
1075            .await
1076            .map_err(|err| anyhow!(err))?;
1077        let node = bindings.greentic_component_node();
1078
1079        let result = node.call_invoke(&mut store, &ctx, operation, &input_json)?;
1080
1081        match result {
1082            InvokeResult::Ok(body) => {
1083                if body.is_empty() {
1084                    return Ok(Value::Null);
1085                }
1086                serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
1087            }
1088            InvokeResult::Err(NodeError {
1089                code,
1090                message,
1091                retryable,
1092                backoff_ms,
1093                details,
1094            }) => {
1095                let mut obj = serde_json::Map::new();
1096                obj.insert("ok".into(), Value::Bool(false));
1097                let mut error = serde_json::Map::new();
1098                error.insert("code".into(), Value::String(code));
1099                error.insert("message".into(), Value::String(message));
1100                error.insert("retryable".into(), Value::Bool(retryable));
1101                if let Some(backoff) = backoff_ms {
1102                    error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1103                }
1104                if let Some(details) = details {
1105                    error.insert(
1106                        "details".into(),
1107                        serde_json::from_str(&details).unwrap_or(Value::String(details)),
1108                    );
1109                }
1110                obj.insert("error".into(), Value::Object(error));
1111                Ok(Value::Object(obj))
1112            }
1113        }
1114    }
1115
1116    pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1117        if let Some(cache) = &self.flows {
1118            return cache
1119                .flows
1120                .get(flow_id)
1121                .cloned()
1122                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1123        }
1124        if let Some(manifest) = &self.manifest {
1125            let entry = manifest
1126                .flows
1127                .iter()
1128                .find(|f| f.id.as_str() == flow_id)
1129                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1130            return Ok(entry.flow.clone());
1131        }
1132        bail!("flow '{flow_id}' not available (pack exports disabled)")
1133    }
1134
1135    pub fn metadata(&self) -> &PackMetadata {
1136        &self.metadata
1137    }
1138
1139    pub fn for_component_test(
1140        components: Vec<(String, PathBuf)>,
1141        flows: HashMap<String, FlowIR>,
1142        config: Arc<HostConfig>,
1143    ) -> Result<Self> {
1144        let engine = Engine::default();
1145        let mut component_map = HashMap::new();
1146        for (name, path) in components {
1147            if !path.exists() {
1148                bail!("component artifact missing: {}", path.display());
1149            }
1150            let wasm_bytes = std::fs::read(&path)?;
1151            let component = Component::from_binary(&engine, &wasm_bytes)
1152                .with_context(|| format!("failed to compile component {}", path.display()))?;
1153            component_map.insert(
1154                name.clone(),
1155                PackComponent {
1156                    name,
1157                    version: "0.0.0".into(),
1158                    component,
1159                },
1160            );
1161        }
1162
1163        let mut flow_map = HashMap::new();
1164        let mut descriptors = Vec::new();
1165        for (id, ir) in flows {
1166            let flow_type = ir.flow_type.clone();
1167            let flow = flow_ir_to_flow(ir)?;
1168            flow_map.insert(id.clone(), flow);
1169            descriptors.push(FlowDescriptor {
1170                id: id.clone(),
1171                flow_type,
1172                profile: "test".into(),
1173                version: "0.0.0".into(),
1174                description: None,
1175            });
1176        }
1177        let flows_cache = PackFlows {
1178            descriptors: descriptors.clone(),
1179            flows: flow_map,
1180            metadata: PackMetadata::fallback(Path::new("component-test")),
1181        };
1182
1183        Ok(Self {
1184            path: PathBuf::new(),
1185            archive_path: None,
1186            config,
1187            engine,
1188            metadata: PackMetadata::fallback(Path::new("component-test")),
1189            manifest: None,
1190            legacy_manifest: None,
1191            mocks: None,
1192            flows: Some(flows_cache),
1193            components: component_map,
1194            http_client: Arc::clone(&HTTP_CLIENT),
1195            pre_cache: Mutex::new(HashMap::new()),
1196            session_store: None,
1197            state_store: None,
1198            wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1199            secrets: crate::secrets::default_manager(),
1200            oauth_config: None,
1201        })
1202    }
1203}
1204
1205fn map_legacy_error(err: LegacyIfaceError) -> types::IfaceError {
1206    match err {
1207        LegacyIfaceError::InvalidArg => types::IfaceError::InvalidArg,
1208        LegacyIfaceError::NotFound => types::IfaceError::NotFound,
1209        LegacyIfaceError::Denied => types::IfaceError::Denied,
1210        LegacyIfaceError::Unavailable => types::IfaceError::Unavailable,
1211        LegacyIfaceError::Internal => types::IfaceError::Internal,
1212    }
1213}
1214
1215struct PackFlows {
1216    descriptors: Vec<FlowDescriptor>,
1217    flows: HashMap<String, Flow>,
1218    metadata: PackMetadata,
1219}
1220
1221impl PackFlows {
1222    fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
1223        let descriptors = manifest
1224            .flows
1225            .iter()
1226            .map(|entry| FlowDescriptor {
1227                id: entry.id.as_str().to_string(),
1228                flow_type: flow_kind_to_str(entry.kind).to_string(),
1229                profile: manifest.pack_id.as_str().to_string(),
1230                version: manifest.version.to_string(),
1231                description: None,
1232            })
1233            .collect();
1234        let mut flows = HashMap::new();
1235        for entry in &manifest.flows {
1236            flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
1237        }
1238        Self {
1239            metadata: PackMetadata::from_manifest(&manifest),
1240            descriptors,
1241            flows,
1242        }
1243    }
1244}
1245
1246fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
1247    match kind {
1248        greentic_types::FlowKind::Messaging => "messaging",
1249        greentic_types::FlowKind::Event => "event",
1250        greentic_types::FlowKind::ComponentConfig => "component-config",
1251        greentic_types::FlowKind::Job => "job",
1252        greentic_types::FlowKind::Http => "http",
1253    }
1254}
1255
1256fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1257    let mut file = archive
1258        .by_name(name)
1259        .with_context(|| format!("entry {name} missing from archive"))?;
1260    let mut buf = Vec::new();
1261    file.read_to_end(&mut buf)?;
1262    Ok(buf)
1263}
1264
1265fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
1266    for node in doc.nodes.values_mut() {
1267        if node.component.is_empty()
1268            && let Some((component_ref, payload)) = node.raw.iter().next()
1269        {
1270            if component_ref.starts_with("emit.") {
1271                node.component = component_ref.clone();
1272                node.payload = payload.clone();
1273                node.raw.clear();
1274                continue;
1275            }
1276            let (target_component, operation, input, config) =
1277                infer_component_exec(payload, component_ref);
1278            let mut payload_obj = serde_json::Map::new();
1279            // component.exec is meta; ensure the payload carries the actual target component.
1280            payload_obj.insert("component".into(), Value::String(target_component));
1281            payload_obj.insert("operation".into(), Value::String(operation));
1282            payload_obj.insert("input".into(), input);
1283            if let Some(cfg) = config {
1284                payload_obj.insert("config".into(), cfg);
1285            }
1286            node.component = "component.exec".to_string();
1287            node.payload = Value::Object(payload_obj);
1288        }
1289    }
1290    doc
1291}
1292
1293fn infer_component_exec(
1294    payload: &Value,
1295    component_ref: &str,
1296) -> (String, String, Value, Option<Value>) {
1297    let default_op = if component_ref.starts_with("templating.") {
1298        "render"
1299    } else {
1300        "invoke"
1301    }
1302    .to_string();
1303
1304    if let Value::Object(map) = payload {
1305        let op = map
1306            .get("op")
1307            .or_else(|| map.get("operation"))
1308            .and_then(Value::as_str)
1309            .map(|s| s.to_string())
1310            .unwrap_or_else(|| default_op.clone());
1311
1312        let mut input = map.clone();
1313        let config = input.remove("config");
1314        let component = input
1315            .get("component")
1316            .or_else(|| input.get("component_ref"))
1317            .and_then(Value::as_str)
1318            .map(|s| s.to_string())
1319            .unwrap_or_else(|| component_ref.to_string());
1320        input.remove("component");
1321        input.remove("component_ref");
1322        input.remove("op");
1323        input.remove("operation");
1324        return (component, op, Value::Object(input), config);
1325    }
1326
1327    (component_ref.to_string(), default_op, payload.clone(), None)
1328}
1329
1330#[cfg(test)]
1331mod tests {
1332    use super::*;
1333    use greentic_flow::model::{FlowDoc, NodeDoc};
1334    use serde_json::json;
1335    use std::collections::BTreeMap;
1336
1337    #[test]
1338    fn normalizes_raw_component_to_component_exec() {
1339        let mut nodes = BTreeMap::new();
1340        let mut raw = BTreeMap::new();
1341        raw.insert(
1342            "templating.handlebars".into(),
1343            json!({ "template": "Hi {{name}}" }),
1344        );
1345        nodes.insert(
1346            "start".into(),
1347            NodeDoc {
1348                raw,
1349                routing: json!([{"out": true}]),
1350                ..Default::default()
1351            },
1352        );
1353        let doc = FlowDoc {
1354            id: "welcome".into(),
1355            title: None,
1356            description: None,
1357            flow_type: "messaging".into(),
1358            start: Some("start".into()),
1359            parameters: json!({}),
1360            tags: Vec::new(),
1361            entrypoints: BTreeMap::new(),
1362            nodes,
1363        };
1364
1365        let normalized = normalize_flow_doc(doc);
1366        let node = normalized.nodes.get("start").expect("node exists");
1367        assert_eq!(node.component, "component.exec");
1368        assert!(node.raw.is_empty() || node.raw.contains_key("templating.handlebars"));
1369        let payload = node.payload.as_object().expect("payload object");
1370        assert_eq!(
1371            payload.get("component"),
1372            Some(&Value::String("templating.handlebars".into()))
1373        );
1374        assert_eq!(
1375            payload.get("operation"),
1376            Some(&Value::String("render".into()))
1377        );
1378        let input = payload.get("input").unwrap();
1379        assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
1380    }
1381}
1382
1383fn load_components_from_archive(
1384    engine: &Engine,
1385    path: &Path,
1386    manifest: Option<&greentic_types::PackManifest>,
1387) -> Result<HashMap<String, PackComponent>> {
1388    let mut archive = ZipArchive::new(File::open(path)?)
1389        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1390    let mut components = HashMap::new();
1391    if let Some(manifest) = manifest {
1392        for entry in &manifest.components {
1393            let file_name = format!("components/{}.wasm", entry.id.as_str());
1394            let bytes = read_entry(&mut archive, &file_name)
1395                .with_context(|| format!("missing component {}", file_name))?;
1396            let component = Component::from_binary(engine, &bytes)
1397                .with_context(|| format!("failed to compile component {}", entry.id.as_str()))?;
1398            components.insert(
1399                entry.id.as_str().to_string(),
1400                PackComponent {
1401                    name: entry.id.as_str().to_string(),
1402                    version: entry.version.to_string(),
1403                    component,
1404                },
1405            );
1406        }
1407    }
1408    Ok(components)
1409}
1410
1411fn load_legacy_components_from_archive(
1412    engine: &Engine,
1413    path: &Path,
1414    manifest: &legacy_pack::PackManifest,
1415) -> Result<HashMap<String, PackComponent>> {
1416    let mut archive = ZipArchive::new(File::open(path)?)
1417        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1418    let mut components = HashMap::new();
1419    for entry in &manifest.components {
1420        let bytes = read_entry(&mut archive, &entry.file_wasm)
1421            .with_context(|| format!("missing component {}", entry.file_wasm))?;
1422        let component = Component::from_binary(engine, &bytes)
1423            .with_context(|| format!("failed to compile component {}", entry.name))?;
1424        components.insert(
1425            entry.name.clone(),
1426            PackComponent {
1427                name: entry.name.clone(),
1428                version: entry.version.to_string(),
1429                component,
1430            },
1431        );
1432    }
1433    Ok(components)
1434}
1435
1436#[derive(Clone, Debug, Default, Serialize, Deserialize)]
1437pub struct PackMetadata {
1438    pub pack_id: String,
1439    pub version: String,
1440    #[serde(default)]
1441    pub entry_flows: Vec<String>,
1442}
1443
1444impl PackMetadata {
1445    fn from_wasm(bytes: &[u8]) -> Option<Self> {
1446        let parser = Parser::new(0);
1447        for payload in parser.parse_all(bytes) {
1448            let payload = payload.ok()?;
1449            match payload {
1450                Payload::CustomSection(section) => {
1451                    if section.name() == "greentic.manifest"
1452                        && let Ok(meta) = Self::from_bytes(section.data())
1453                    {
1454                        return Some(meta);
1455                    }
1456                }
1457                Payload::DataSection(reader) => {
1458                    for segment in reader.into_iter().flatten() {
1459                        if let Ok(meta) = Self::from_bytes(segment.data) {
1460                            return Some(meta);
1461                        }
1462                    }
1463                }
1464                _ => {}
1465            }
1466        }
1467        None
1468    }
1469
1470    fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
1471        #[derive(Deserialize)]
1472        struct RawManifest {
1473            pack_id: String,
1474            version: String,
1475            #[serde(default)]
1476            entry_flows: Vec<String>,
1477            #[serde(default)]
1478            flows: Vec<RawFlow>,
1479        }
1480
1481        #[derive(Deserialize)]
1482        struct RawFlow {
1483            id: String,
1484        }
1485
1486        let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
1487        let mut entry_flows = if manifest.entry_flows.is_empty() {
1488            manifest.flows.iter().map(|f| f.id.clone()).collect()
1489        } else {
1490            manifest.entry_flows.clone()
1491        };
1492        entry_flows.retain(|id| !id.is_empty());
1493        Ok(Self {
1494            pack_id: manifest.pack_id,
1495            version: manifest.version,
1496            entry_flows,
1497        })
1498    }
1499
1500    pub fn fallback(path: &Path) -> Self {
1501        let pack_id = path
1502            .file_stem()
1503            .map(|s| s.to_string_lossy().into_owned())
1504            .unwrap_or_else(|| "unknown-pack".to_string());
1505        Self {
1506            pack_id,
1507            version: "0.0.0".to_string(),
1508            entry_flows: Vec::new(),
1509        }
1510    }
1511
1512    pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
1513        let entry_flows = manifest
1514            .flows
1515            .iter()
1516            .map(|flow| flow.id.as_str().to_string())
1517            .collect::<Vec<_>>();
1518        Self {
1519            pack_id: manifest.pack_id.as_str().to_string(),
1520            version: manifest.version.to_string(),
1521            entry_flows,
1522        }
1523    }
1524}
1525
1526impl From<&HttpMockResponse> for LegacyHttpResponse {
1527    fn from(value: &HttpMockResponse) -> Self {
1528        let headers_json = serde_json::to_string(&value.headers).ok();
1529        Self {
1530            status: value.status,
1531            headers_json,
1532            body: value.body.clone(),
1533        }
1534    }
1535}