greentic_runner_host/
pack.rs

1use std::collections::{BTreeMap, HashMap};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7
8use crate::component_api::component::greentic::component::control::Host as ComponentControlHost;
9use crate::component_api::{
10    ComponentPre, control, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
11};
12use crate::imports::register_all;
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::runtime_wasmtime::{Component, Engine, Linker, ResourceTable, WasmResult};
15use anyhow::{Context, Result, anyhow, bail};
16use greentic_flow::ir::{FlowIR, NodeIR, RouteIR};
17use greentic_flow::model::FlowDoc;
18use greentic_interfaces_host::host_import::v0_2 as host_import_v0_2;
19use greentic_interfaces_host::host_import::v0_2::greentic::host_import::imports::{
20    HttpRequest as LegacyHttpRequest, HttpResponse as LegacyHttpResponse,
21    IfaceError as LegacyIfaceError, TenantCtx as LegacyTenantCtx,
22};
23use greentic_interfaces_host::host_import::v0_6::{
24    self as host_import_v0_6, iface_types, state, types,
25};
26use greentic_pack::reader::{SigningPolicy, open_pack};
27use greentic_session::SessionKey as StoreSessionKey;
28use greentic_types::{
29    EnvId, FlowId, SessionCursor as StoreSessionCursor, SessionData, StateKey as StoreStateKey,
30    TeamId, TenantCtx as TypesTenantCtx, TenantId, UserId,
31};
32use indexmap::IndexMap;
33use once_cell::sync::Lazy;
34use parking_lot::Mutex;
35use reqwest::blocking::Client as BlockingClient;
36use runner_core::normalize_under_root;
37use serde::{Deserialize, Serialize};
38use serde_cbor;
39use serde_json::{self, Value};
40use serde_yaml_bw as serde_yaml;
41use tokio::fs;
42use wasmparser::{Parser, Payload};
43use wasmtime::StoreContextMut;
44use zip::ZipArchive;
45
46use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
47
48use crate::config::HostConfig;
49use crate::secrets::{DynSecretsManager, read_secret_blocking};
50use crate::storage::state::STATE_PREFIX;
51use crate::storage::{DynSessionStore, DynStateStore};
52use crate::verify;
53use crate::wasi::RunnerWasiPolicy;
54use tracing::warn;
55use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
56
57#[allow(dead_code)]
58pub struct PackRuntime {
59    /// Component artifact path (wasm file).
60    path: PathBuf,
61    /// Optional archive (.gtpack) used to load flows/manifests.
62    archive_path: Option<PathBuf>,
63    config: Arc<HostConfig>,
64    engine: Engine,
65    metadata: PackMetadata,
66    manifest: Option<greentic_pack::builder::PackManifest>,
67    mocks: Option<Arc<MockLayer>>,
68    flows: Option<PackFlows>,
69    components: HashMap<String, PackComponent>,
70    http_client: Arc<BlockingClient>,
71    pre_cache: Mutex<HashMap<String, ComponentPre<ComponentState>>>,
72    session_store: Option<DynSessionStore>,
73    state_store: Option<DynStateStore>,
74    wasi_policy: Arc<RunnerWasiPolicy>,
75    secrets: DynSecretsManager,
76    oauth_config: Option<OAuthBrokerConfig>,
77}
78
79struct PackComponent {
80    #[allow(dead_code)]
81    name: String,
82    #[allow(dead_code)]
83    version: String,
84    component: Component,
85}
86
87fn build_blocking_client() -> BlockingClient {
88    std::thread::spawn(|| BlockingClient::builder().build().expect("blocking client"))
89        .join()
90        .expect("client build thread panicked")
91}
92
93fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
94    let (root, candidate) = if path.is_absolute() {
95        let parent = path
96            .parent()
97            .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
98        let root = parent
99            .canonicalize()
100            .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
101        let file = path
102            .file_name()
103            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
104        (root, PathBuf::from(file))
105    } else {
106        let cwd = std::env::current_dir().context("failed to resolve current directory")?;
107        let base = if let Some(parent) = path.parent() {
108            cwd.join(parent)
109        } else {
110            cwd
111        };
112        let root = base
113            .canonicalize()
114            .with_context(|| format!("failed to canonicalize {}", base.display()))?;
115        let file = path
116            .file_name()
117            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
118        (root, PathBuf::from(file))
119    };
120    let safe = normalize_under_root(&root, &candidate)?;
121    Ok((root, safe))
122}
123
124static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
125
126#[derive(Debug, Clone, Serialize, Deserialize)]
127pub struct FlowDescriptor {
128    pub id: String,
129    #[serde(rename = "type")]
130    pub flow_type: String,
131    pub profile: String,
132    pub version: String,
133    #[serde(default)]
134    pub description: Option<String>,
135}
136
137pub struct HostState {
138    config: Arc<HostConfig>,
139    http_client: Arc<BlockingClient>,
140    default_env: String,
141    session_store: Option<DynSessionStore>,
142    state_store: Option<DynStateStore>,
143    mocks: Option<Arc<MockLayer>>,
144    secrets: DynSecretsManager,
145    oauth_config: Option<OAuthBrokerConfig>,
146    oauth_host: OAuthBrokerHost,
147}
148
149impl HostState {
150    #[allow(clippy::default_constructed_unit_structs)]
151    pub fn new(
152        config: Arc<HostConfig>,
153        http_client: Arc<BlockingClient>,
154        mocks: Option<Arc<MockLayer>>,
155        session_store: Option<DynSessionStore>,
156        state_store: Option<DynStateStore>,
157        secrets: DynSecretsManager,
158        oauth_config: Option<OAuthBrokerConfig>,
159    ) -> Result<Self> {
160        let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
161        Ok(Self {
162            config,
163            http_client,
164            default_env,
165            session_store,
166            state_store,
167            mocks,
168            secrets,
169            oauth_config,
170            oauth_host: OAuthBrokerHost::default(),
171        })
172    }
173
174    pub fn get_secret(&self, key: &str) -> Result<String> {
175        if !self.config.secrets_policy.is_allowed(key) {
176            bail!("secret {key} is not permitted by bindings policy");
177        }
178        if let Some(mock) = &self.mocks
179            && let Some(value) = mock.secrets_lookup(key)
180        {
181            return Ok(value);
182        }
183        let bytes = read_secret_blocking(&self.secrets, key)
184            .context("failed to read secret from manager")?;
185        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
186        Ok(value)
187    }
188
189    fn tenant_ctx_from_v6(&self, ctx: Option<types::TenantCtx>) -> Result<TypesTenantCtx> {
190        let tenant_raw = ctx
191            .as_ref()
192            .map(|ctx| ctx.tenant.clone())
193            .unwrap_or_else(|| self.config.tenant.clone());
194        let tenant_id = TenantId::from_str(&tenant_raw)
195            .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
196        let env_id = EnvId::from_str(&self.default_env)
197            .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
198        let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
199        if let Some(ctx) = ctx {
200            if let Some(team) = ctx.team {
201                let team_id =
202                    TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
203                tenant_ctx = tenant_ctx.with_team(Some(team_id));
204            }
205            if let Some(user) = ctx.user {
206                let user_id =
207                    UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
208                tenant_ctx = tenant_ctx.with_user(Some(user_id));
209            }
210            if let Some(flow) = ctx.flow_id {
211                tenant_ctx = tenant_ctx.with_flow(flow);
212            }
213            if let Some(node) = ctx.node_id {
214                tenant_ctx = tenant_ctx.with_node(node);
215            }
216            if let Some(provider) = ctx.provider_id {
217                tenant_ctx = tenant_ctx.with_provider(provider);
218            }
219            if let Some(session) = ctx.session_id {
220                tenant_ctx = tenant_ctx.with_session(session);
221            }
222            tenant_ctx.trace_id = ctx.trace_id;
223        }
224        Ok(tenant_ctx)
225    }
226
227    fn session_store_handle(&self) -> Result<DynSessionStore, types::IfaceError> {
228        self.session_store
229            .as_ref()
230            .cloned()
231            .ok_or(types::IfaceError::Unavailable)
232    }
233
234    fn state_store_handle(&self) -> Result<DynStateStore, types::IfaceError> {
235        self.state_store
236            .as_ref()
237            .cloned()
238            .ok_or(types::IfaceError::Unavailable)
239    }
240
241    fn ensure_user(ctx: &TypesTenantCtx) -> Result<UserId, types::IfaceError> {
242        ctx.user_id
243            .clone()
244            .or_else(|| ctx.user.clone())
245            .ok_or(types::IfaceError::InvalidArg)
246    }
247
248    fn ensure_flow(ctx: &TypesTenantCtx) -> Result<FlowId, types::IfaceError> {
249        let flow = ctx.flow_id().ok_or(types::IfaceError::InvalidArg)?;
250        FlowId::from_str(flow).map_err(|_| types::IfaceError::InvalidArg)
251    }
252
253    fn cursor_from_iface(cursor: iface_types::SessionCursor) -> StoreSessionCursor {
254        let mut store_cursor = StoreSessionCursor::new(cursor.node_pointer);
255        if let Some(reason) = cursor.wait_reason {
256            store_cursor = store_cursor.with_wait_reason(reason);
257        }
258        if let Some(marker) = cursor.outbox_marker {
259            store_cursor = store_cursor.with_outbox_marker(marker);
260        }
261        store_cursor
262    }
263}
264
265pub struct ComponentState {
266    host: HostState,
267    wasi_ctx: WasiCtx,
268    resource_table: ResourceTable,
269}
270
271impl ComponentState {
272    pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
273        let wasi_ctx = policy
274            .instantiate()
275            .context("failed to build WASI context")?;
276        Ok(Self {
277            host,
278            wasi_ctx,
279            resource_table: ResourceTable::new(),
280        })
281    }
282
283    fn host_mut(&mut self) -> &mut HostState {
284        &mut self.host
285    }
286}
287
288impl control::Host for ComponentState {
289    fn should_cancel(&mut self) -> bool {
290        false
291    }
292
293    fn yield_now(&mut self) {
294        // no-op cooperative yield
295    }
296}
297
298fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
299    let mut inst = linker.instance("greentic:component/control@0.4.0")?;
300    inst.func_wrap(
301        "should-cancel",
302        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
303            let host = caller.data_mut();
304            Ok((ComponentControlHost::should_cancel(host),))
305        },
306    )?;
307    inst.func_wrap(
308        "yield-now",
309        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
310            let host = caller.data_mut();
311            ComponentControlHost::yield_now(host);
312            Ok(())
313        },
314    )?;
315    Ok(())
316}
317
318impl OAuthHostContext for ComponentState {
319    fn tenant_id(&self) -> &str {
320        &self.host.config.tenant
321    }
322
323    fn env(&self) -> &str {
324        &self.host.default_env
325    }
326
327    fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
328        &mut self.host.oauth_host
329    }
330
331    fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
332        self.host.oauth_config.as_ref()
333    }
334}
335
336impl WasiView for ComponentState {
337    fn ctx(&mut self) -> WasiCtxView<'_> {
338        WasiCtxView {
339            ctx: &mut self.wasi_ctx,
340            table: &mut self.resource_table,
341        }
342    }
343}
344
345#[allow(unsafe_code)]
346unsafe impl Send for ComponentState {}
347#[allow(unsafe_code)]
348unsafe impl Sync for ComponentState {}
349
350impl host_import_v0_6::HostImports for ComponentState {
351    fn secrets_get(
352        &mut self,
353        key: String,
354        ctx: Option<types::TenantCtx>,
355    ) -> WasmResult<Result<String, types::IfaceError>> {
356        host_import_v0_6::HostImports::secrets_get(self.host_mut(), key, ctx)
357    }
358
359    fn telemetry_emit(
360        &mut self,
361        span_json: String,
362        ctx: Option<types::TenantCtx>,
363    ) -> WasmResult<()> {
364        host_import_v0_6::HostImports::telemetry_emit(self.host_mut(), span_json, ctx)
365    }
366
367    fn http_fetch(
368        &mut self,
369        req: host_import_v0_6::http::HttpRequest,
370        ctx: Option<types::TenantCtx>,
371    ) -> WasmResult<Result<host_import_v0_6::http::HttpResponse, types::IfaceError>> {
372        host_import_v0_6::HostImports::http_fetch(self.host_mut(), req, ctx)
373    }
374
375    fn mcp_exec(
376        &mut self,
377        component: String,
378        action: String,
379        args_json: String,
380        ctx: Option<types::TenantCtx>,
381    ) -> WasmResult<Result<String, types::IfaceError>> {
382        host_import_v0_6::HostImports::mcp_exec(self.host_mut(), component, action, args_json, ctx)
383    }
384
385    fn state_get(
386        &mut self,
387        key: iface_types::StateKey,
388        ctx: Option<types::TenantCtx>,
389    ) -> WasmResult<Result<String, types::IfaceError>> {
390        host_import_v0_6::HostImports::state_get(self.host_mut(), key, ctx)
391    }
392
393    fn state_set(
394        &mut self,
395        key: iface_types::StateKey,
396        value_json: String,
397        ctx: Option<types::TenantCtx>,
398    ) -> WasmResult<Result<state::OpAck, types::IfaceError>> {
399        host_import_v0_6::HostImports::state_set(self.host_mut(), key, value_json, ctx)
400    }
401
402    fn session_update(
403        &mut self,
404        cursor: iface_types::SessionCursor,
405        ctx: Option<types::TenantCtx>,
406    ) -> WasmResult<Result<String, types::IfaceError>> {
407        host_import_v0_6::HostImports::session_update(self.host_mut(), cursor, ctx)
408    }
409}
410
411impl host_import_v0_2::HostImports for ComponentState {
412    fn secrets_get(
413        &mut self,
414        key: String,
415        ctx: Option<LegacyTenantCtx>,
416    ) -> WasmResult<Result<String, LegacyIfaceError>> {
417        host_import_v0_2::HostImports::secrets_get(self.host_mut(), key, ctx)
418    }
419
420    fn telemetry_emit(
421        &mut self,
422        span_json: String,
423        ctx: Option<LegacyTenantCtx>,
424    ) -> WasmResult<()> {
425        host_import_v0_2::HostImports::telemetry_emit(self.host_mut(), span_json, ctx)
426    }
427
428    fn tool_invoke(
429        &mut self,
430        tool: String,
431        action: String,
432        args_json: String,
433        ctx: Option<LegacyTenantCtx>,
434    ) -> WasmResult<Result<String, LegacyIfaceError>> {
435        host_import_v0_2::HostImports::tool_invoke(self.host_mut(), tool, action, args_json, ctx)
436    }
437
438    fn http_fetch(
439        &mut self,
440        req: host_import_v0_2::greentic::host_import::imports::HttpRequest,
441        ctx: Option<host_import_v0_2::greentic::host_import::imports::TenantCtx>,
442    ) -> WasmResult<
443        Result<
444            host_import_v0_2::greentic::host_import::imports::HttpResponse,
445            host_import_v0_2::greentic::host_import::imports::IfaceError,
446        >,
447    > {
448        host_import_v0_2::HostImports::http_fetch(self.host_mut(), req, ctx)
449    }
450}
451
452impl host_import_v0_6::HostImports for HostState {
453    fn secrets_get(
454        &mut self,
455        key: String,
456        _ctx: Option<types::TenantCtx>,
457    ) -> WasmResult<Result<String, types::IfaceError>> {
458        Ok(self.get_secret(&key).map_err(|err| {
459            tracing::warn!(secret = %key, error = %err, "secret lookup denied");
460            types::IfaceError::Denied
461        }))
462    }
463
464    fn telemetry_emit(
465        &mut self,
466        span_json: String,
467        _ctx: Option<types::TenantCtx>,
468    ) -> WasmResult<()> {
469        if let Some(mock) = &self.mocks
470            && mock.telemetry_drain(&[("span_json", span_json.as_str())])
471        {
472            return Ok(());
473        }
474        tracing::info!(span = %span_json, "telemetry emit from pack");
475        Ok(())
476    }
477
478    fn http_fetch(
479        &mut self,
480        req: host_import_v0_6::http::HttpRequest,
481        _ctx: Option<types::TenantCtx>,
482    ) -> WasmResult<Result<host_import_v0_6::http::HttpResponse, types::IfaceError>> {
483        let legacy_req = LegacyHttpRequest {
484            method: req.method,
485            url: req.url,
486            headers_json: req.headers_json,
487            body: req.body,
488        };
489        match host_import_v0_2::HostImports::http_fetch(self, legacy_req, None)? {
490            Ok(resp) => Ok(Ok(host_import_v0_6::http::HttpResponse {
491                status: resp.status,
492                headers_json: resp.headers_json,
493                body: resp.body,
494            })),
495            Err(err) => Ok(Err(map_legacy_error(err))),
496        }
497    }
498
499    fn mcp_exec(
500        &mut self,
501        component: String,
502        action: String,
503        args_json: String,
504        ctx: Option<types::TenantCtx>,
505    ) -> WasmResult<Result<String, types::IfaceError>> {
506        let _ = (component, action, args_json, ctx);
507        tracing::warn!("mcp.exec requested but MCP bridge is removed; returning unavailable");
508        Ok(Err(types::IfaceError::Unavailable))
509    }
510
511    fn state_get(
512        &mut self,
513        key: iface_types::StateKey,
514        ctx: Option<types::TenantCtx>,
515    ) -> WasmResult<Result<String, types::IfaceError>> {
516        let store = match self.state_store_handle() {
517            Ok(store) => store,
518            Err(err) => return Ok(Err(err)),
519        };
520        let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
521            Ok(ctx) => ctx,
522            Err(err) => {
523                tracing::warn!(error = %err, "invalid tenant context for state.get");
524                return Ok(Err(types::IfaceError::InvalidArg));
525            }
526        };
527        let key = StoreStateKey::from(key);
528        match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
529            Ok(Some(value)) => {
530                let result = if let Some(text) = value.as_str() {
531                    text.to_string()
532                } else {
533                    serde_json::to_string(&value).unwrap_or_else(|_| value.to_string())
534                };
535                Ok(Ok(result))
536            }
537            Ok(None) => Ok(Err(types::IfaceError::NotFound)),
538            Err(err) => {
539                tracing::warn!(error = %err, "state.get failed");
540                Ok(Err(types::IfaceError::Internal))
541            }
542        }
543    }
544
545    fn state_set(
546        &mut self,
547        key: iface_types::StateKey,
548        value_json: String,
549        ctx: Option<types::TenantCtx>,
550    ) -> WasmResult<Result<state::OpAck, types::IfaceError>> {
551        let store = match self.state_store_handle() {
552            Ok(store) => store,
553            Err(err) => return Ok(Err(err)),
554        };
555        let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
556            Ok(ctx) => ctx,
557            Err(err) => {
558                tracing::warn!(error = %err, "invalid tenant context for state.set");
559                return Ok(Err(types::IfaceError::InvalidArg));
560            }
561        };
562        let key = StoreStateKey::from(key);
563        let value = serde_json::from_str(&value_json).unwrap_or(Value::String(value_json));
564        match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
565            Ok(()) => Ok(Ok(state::OpAck::Ok)),
566            Err(err) => {
567                tracing::warn!(error = %err, "state.set failed");
568                Ok(Err(types::IfaceError::Internal))
569            }
570        }
571    }
572
573    fn session_update(
574        &mut self,
575        cursor: iface_types::SessionCursor,
576        ctx: Option<types::TenantCtx>,
577    ) -> WasmResult<Result<String, types::IfaceError>> {
578        let store = match self.session_store_handle() {
579            Ok(store) => store,
580            Err(err) => return Ok(Err(err)),
581        };
582        let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
583            Ok(ctx) => ctx,
584            Err(err) => {
585                tracing::warn!(error = %err, "invalid tenant context for session.update");
586                return Ok(Err(types::IfaceError::InvalidArg));
587            }
588        };
589        let user = match Self::ensure_user(&tenant_ctx) {
590            Ok(user) => user,
591            Err(err) => return Ok(Err(err)),
592        };
593        let flow_id = match Self::ensure_flow(&tenant_ctx) {
594            Ok(flow) => flow,
595            Err(err) => return Ok(Err(err)),
596        };
597        let cursor = Self::cursor_from_iface(cursor);
598        let payload = SessionData {
599            tenant_ctx: tenant_ctx.clone(),
600            flow_id,
601            cursor: cursor.clone(),
602            context_json: serde_json::json!({
603                "node_pointer": cursor.node_pointer,
604                "wait_reason": cursor.wait_reason,
605                "outbox_marker": cursor.outbox_marker,
606            })
607            .to_string(),
608        };
609        if let Some(existing) = tenant_ctx.session_id() {
610            let key = StoreSessionKey::from(existing.to_string());
611            if let Err(err) = store.update_session(&key, payload) {
612                tracing::error!(error = %err, "failed to update session snapshot");
613                return Ok(Err(types::IfaceError::Internal));
614            }
615            return Ok(Ok(existing.to_string()));
616        }
617        match store.find_by_user(&tenant_ctx, &user) {
618            Ok(Some((key, _))) => {
619                if let Err(err) = store.update_session(&key, payload) {
620                    tracing::error!(error = %err, "failed to update existing user session");
621                    return Ok(Err(types::IfaceError::Internal));
622                }
623                return Ok(Ok(key.to_string()));
624            }
625            Ok(None) => {}
626            Err(err) => {
627                tracing::error!(error = %err, "session lookup failed");
628                return Ok(Err(types::IfaceError::Internal));
629            }
630        }
631        let key = match store.create_session(&tenant_ctx, payload.clone()) {
632            Ok(key) => key,
633            Err(err) => {
634                tracing::error!(error = %err, "failed to create session");
635                return Ok(Err(types::IfaceError::Internal));
636            }
637        };
638        let ctx_with_session = tenant_ctx.with_session(key.to_string());
639        let updated_payload = SessionData {
640            tenant_ctx: ctx_with_session.clone(),
641            ..payload
642        };
643        if let Err(err) = store.update_session(&key, updated_payload) {
644            tracing::warn!(error = %err, "failed to stamp session id after create");
645        }
646        Ok(Ok(key.to_string()))
647    }
648}
649
650impl host_import_v0_2::HostImports for HostState {
651    fn secrets_get(
652        &mut self,
653        key: String,
654        _ctx: Option<LegacyTenantCtx>,
655    ) -> WasmResult<Result<String, LegacyIfaceError>> {
656        Ok(self.get_secret(&key).map_err(|err| {
657            tracing::warn!(secret = %key, error = %err, "secret lookup denied");
658            LegacyIfaceError::Denied
659        }))
660    }
661
662    fn telemetry_emit(
663        &mut self,
664        span_json: String,
665        _ctx: Option<LegacyTenantCtx>,
666    ) -> WasmResult<()> {
667        if let Some(mock) = &self.mocks
668            && mock.telemetry_drain(&[("span_json", span_json.as_str())])
669        {
670            return Ok(());
671        }
672        tracing::info!(span = %span_json, "telemetry emit from pack");
673        Ok(())
674    }
675
676    fn tool_invoke(
677        &mut self,
678        tool: String,
679        action: String,
680        args_json: String,
681        ctx: Option<LegacyTenantCtx>,
682    ) -> WasmResult<Result<String, LegacyIfaceError>> {
683        let _ = (tool, action, args_json, ctx);
684        tracing::warn!("tool invoke requested but MCP bridge is removed; returning unavailable");
685        Ok(Err(LegacyIfaceError::Unavailable))
686    }
687
688    fn http_fetch(
689        &mut self,
690        req: LegacyHttpRequest,
691        _ctx: Option<LegacyTenantCtx>,
692    ) -> WasmResult<Result<LegacyHttpResponse, LegacyIfaceError>> {
693        if !self.config.http_enabled {
694            tracing::warn!(url = %req.url, "http fetch denied by policy");
695            return Ok(Err(LegacyIfaceError::Denied));
696        }
697
698        let mut mock_state = None;
699        let raw_body = req.body.clone();
700        if let Some(mock) = &self.mocks
701            && let Ok(meta) = HttpMockRequest::new(
702                &req.method,
703                &req.url,
704                raw_body.as_deref().map(|body| body.as_bytes()),
705            )
706        {
707            match mock.http_begin(&meta) {
708                HttpDecision::Mock(response) => {
709                    let http = LegacyHttpResponse::from(&response);
710                    return Ok(Ok(http));
711                }
712                HttpDecision::Deny(reason) => {
713                    tracing::warn!(url = %req.url, reason = %reason, "http fetch blocked by mocks");
714                    return Ok(Err(LegacyIfaceError::Denied));
715                }
716                HttpDecision::Passthrough { record } => {
717                    mock_state = Some((meta, record));
718                }
719            }
720        }
721
722        let method = req.method.parse().unwrap_or(reqwest::Method::GET);
723        let mut builder = self.http_client.request(method, &req.url);
724
725        if let Some(headers_json) = req.headers_json.as_ref() {
726            match serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(headers_json) {
727                Ok(map) => {
728                    for (key, value) in map {
729                        if let Some(val) = value.as_str()
730                            && let Ok(header) =
731                                reqwest::header::HeaderName::from_bytes(key.as_bytes())
732                            && let Ok(header_value) = reqwest::header::HeaderValue::from_str(val)
733                        {
734                            builder = builder.header(header, header_value);
735                        }
736                    }
737                }
738                Err(err) => {
739                    tracing::warn!(error = %err, "failed to parse headers for http.fetch");
740                }
741            }
742        }
743
744        if let Some(body) = raw_body.clone() {
745            builder = builder.body(body);
746        }
747
748        let response = match builder.send() {
749            Ok(resp) => resp,
750            Err(err) => {
751                tracing::error!(url = %req.url, error = %err, "http fetch failed");
752                return Ok(Err(LegacyIfaceError::Unavailable));
753            }
754        };
755
756        let status = response.status().as_u16();
757        let headers_map = response
758            .headers()
759            .iter()
760            .map(|(k, v)| {
761                (
762                    k.as_str().to_string(),
763                    v.to_str().unwrap_or_default().to_string(),
764                )
765            })
766            .collect::<BTreeMap<_, _>>();
767        let headers_json = serde_json::to_string(&headers_map).ok();
768        let body = response.text().ok();
769
770        if let Some((meta, true)) = mock_state.take()
771            && let Some(mock) = &self.mocks
772        {
773            let recorded = HttpMockResponse::new(status, headers_map.clone(), body.clone());
774            mock.http_record(&meta, &recorded);
775        }
776
777        Ok(Ok(LegacyHttpResponse {
778            status,
779            headers_json,
780            body,
781        }))
782    }
783}
784
785impl PackRuntime {
786    #[allow(clippy::too_many_arguments)]
787    pub async fn load(
788        path: impl AsRef<Path>,
789        config: Arc<HostConfig>,
790        mocks: Option<Arc<MockLayer>>,
791        archive_source: Option<&Path>,
792        session_store: Option<DynSessionStore>,
793        state_store: Option<DynStateStore>,
794        wasi_policy: Arc<RunnerWasiPolicy>,
795        secrets: DynSecretsManager,
796        oauth_config: Option<OAuthBrokerConfig>,
797        verify_archive: bool,
798    ) -> Result<Self> {
799        let path = path.as_ref();
800        let (_pack_root, safe_path) = normalize_pack_path(path)?;
801        let is_component = safe_path
802            .extension()
803            .and_then(|ext| ext.to_str())
804            .map(|ext| ext.eq_ignore_ascii_case("wasm"))
805            .unwrap_or(false);
806        let archive_hint_path = if let Some(source) = archive_source {
807            let (_, normalized) = normalize_pack_path(source)?;
808            Some(normalized)
809        } else if is_component {
810            None
811        } else {
812            Some(safe_path.clone())
813        };
814        let archive_hint = archive_hint_path.as_deref();
815        if verify_archive {
816            let verify_target = archive_hint.unwrap_or(&safe_path);
817            verify::verify_pack(verify_target).await?;
818            tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
819        }
820        let engine = Engine::default();
821        let wasm_bytes = fs::read(&safe_path).await?;
822        let mut metadata = PackMetadata::from_wasm(&wasm_bytes)
823            .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
824        let mut manifest = None;
825        let flows = if let Some(archive_path) = archive_hint {
826            match open_pack(archive_path, SigningPolicy::DevOk) {
827                Ok(pack) => {
828                    metadata = PackMetadata::from_manifest(&pack.manifest);
829                    manifest = Some(pack.manifest);
830                    None
831                }
832                Err(err) => {
833                    warn!(error = ?err, pack = %archive_path.display(), "failed to parse pack manifest; falling back to legacy flow cache");
834                    match PackFlows::from_archive(archive_path) {
835                        Ok(cache) => {
836                            metadata = cache.metadata.clone();
837                            Some(cache)
838                        }
839                        Err(err) => {
840                            warn!(
841                                error = %err,
842                                pack = %archive_path.display(),
843                                "failed to parse pack flows via greentic-pack; falling back to component exports"
844                            );
845                            None
846                        }
847                    }
848                }
849            }
850        } else {
851            None
852        };
853        let components = if let Some(archive_path) = archive_hint {
854            match load_components_from_archive(&engine, archive_path) {
855                Ok(map) => map,
856                Err(err) => {
857                    warn!(error = %err, pack = %archive_path.display(), "failed to load components from archive");
858                    HashMap::new()
859                }
860            }
861        } else if is_component {
862            let name = safe_path
863                .file_stem()
864                .map(|s| s.to_string_lossy().to_string())
865                .unwrap_or_else(|| "component".to_string());
866            let component = Component::from_binary(&engine, &wasm_bytes)?;
867            let mut map = HashMap::new();
868            map.insert(
869                name.clone(),
870                PackComponent {
871                    name,
872                    version: metadata.version.clone(),
873                    component,
874                },
875            );
876            map
877        } else {
878            HashMap::new()
879        };
880        let http_client = Arc::clone(&HTTP_CLIENT);
881        Ok(Self {
882            path: safe_path,
883            archive_path: archive_hint.map(Path::to_path_buf),
884            config,
885            engine,
886            metadata,
887            manifest,
888            mocks,
889            flows,
890            components,
891            http_client,
892            pre_cache: Mutex::new(HashMap::new()),
893            session_store,
894            state_store,
895            wasi_policy,
896            secrets,
897            oauth_config,
898        })
899    }
900
901    pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
902        if let Some(cache) = &self.flows {
903            return Ok(cache.descriptors.clone());
904        }
905        if let Some(manifest) = &self.manifest {
906            let descriptors = manifest
907                .flows
908                .iter()
909                .map(|flow| FlowDescriptor {
910                    id: flow.id.clone(),
911                    flow_type: flow.kind.clone(),
912                    profile: manifest.meta.name.clone(),
913                    version: manifest.meta.version.to_string(),
914                    description: Some(flow.kind.clone()),
915                })
916                .collect();
917            return Ok(descriptors);
918        }
919        Ok(Vec::new())
920    }
921
922    #[allow(dead_code)]
923    pub async fn run_flow(
924        &self,
925        _flow_id: &str,
926        _input: serde_json::Value,
927    ) -> Result<serde_json::Value> {
928        // TODO: dispatch flow execution via Wasmtime
929        Ok(serde_json::json!({}))
930    }
931
932    pub async fn invoke_component(
933        &self,
934        component_ref: &str,
935        ctx: ComponentExecCtx,
936        operation: &str,
937        _config_json: Option<String>,
938        input_json: String,
939    ) -> Result<Value> {
940        let pack_component = self
941            .components
942            .get(component_ref)
943            .with_context(|| format!("component '{component_ref}' not found in pack"))?;
944
945        let pre = if let Some(pre) = self.pre_cache.lock().get(component_ref).cloned() {
946            pre
947        } else {
948            let mut linker = Linker::new(&self.engine);
949            register_all(&mut linker, self.oauth_config.is_some())?;
950            add_component_control_to_linker(&mut linker)?;
951            let pre = ComponentPre::new(
952                linker
953                    .instantiate_pre(&pack_component.component)
954                    .map_err(|err| anyhow!(err))?,
955            )
956            .map_err(|err| anyhow!(err))?;
957            self.pre_cache
958                .lock()
959                .insert(component_ref.to_string(), pre.clone());
960            pre
961        };
962
963        let host_state = HostState::new(
964            Arc::clone(&self.config),
965            Arc::clone(&self.http_client),
966            self.mocks.clone(),
967            self.session_store.clone(),
968            self.state_store.clone(),
969            Arc::clone(&self.secrets),
970            self.oauth_config.clone(),
971        )?;
972        let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
973        let mut store = wasmtime::Store::new(&self.engine, store_state);
974        let bindings = pre
975            .instantiate_async(&mut store)
976            .await
977            .map_err(|err| anyhow!(err))?;
978        let node = bindings.greentic_component_node();
979
980        let result = node.call_invoke(&mut store, &ctx, operation, &input_json)?;
981
982        match result {
983            InvokeResult::Ok(body) => {
984                if body.is_empty() {
985                    return Ok(Value::Null);
986                }
987                serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
988            }
989            InvokeResult::Err(NodeError {
990                code,
991                message,
992                retryable,
993                backoff_ms,
994                details,
995            }) => {
996                let mut obj = serde_json::Map::new();
997                obj.insert("ok".into(), Value::Bool(false));
998                let mut error = serde_json::Map::new();
999                error.insert("code".into(), Value::String(code));
1000                error.insert("message".into(), Value::String(message));
1001                error.insert("retryable".into(), Value::Bool(retryable));
1002                if let Some(backoff) = backoff_ms {
1003                    error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1004                }
1005                if let Some(details) = details {
1006                    error.insert(
1007                        "details".into(),
1008                        serde_json::from_str(&details).unwrap_or(Value::String(details)),
1009                    );
1010                }
1011                obj.insert("error".into(), Value::Object(error));
1012                Ok(Value::Object(obj))
1013            }
1014        }
1015    }
1016
1017    pub fn load_flow_ir(&self, flow_id: &str) -> Result<greentic_flow::ir::FlowIR> {
1018        if let Some(cache) = &self.flows {
1019            return cache
1020                .flows
1021                .get(flow_id)
1022                .cloned()
1023                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1024        }
1025        if let Some(manifest) = &self.manifest {
1026            let entry = manifest
1027                .flows
1028                .iter()
1029                .find(|f| f.id == flow_id)
1030                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1031            let archive_path = self.archive_path.as_ref().unwrap_or(&self.path);
1032            let mut archive = ZipArchive::new(File::open(archive_path)?)?;
1033            match load_flow_doc(&mut archive, entry).and_then(|doc| {
1034                greentic_flow::to_ir(doc.clone())
1035                    .with_context(|| format!("failed to build IR for flow {}", doc.id))
1036            }) {
1037                Ok(ir) => return Ok(ir),
1038                Err(err) => {
1039                    warn!(
1040                        error = %err,
1041                        flow_id = flow_id,
1042                        "failed to parse flow from archive; falling back to stub IR"
1043                    );
1044                    return Ok(build_stub_flow_ir(&entry.id, &entry.kind));
1045                }
1046            }
1047        }
1048        bail!("flow '{flow_id}' not available (pack exports disabled)")
1049    }
1050
1051    pub fn metadata(&self) -> &PackMetadata {
1052        &self.metadata
1053    }
1054
1055    pub fn for_component_test(
1056        components: Vec<(String, PathBuf)>,
1057        flows: HashMap<String, FlowIR>,
1058        config: Arc<HostConfig>,
1059    ) -> Result<Self> {
1060        let engine = Engine::default();
1061        let mut component_map = HashMap::new();
1062        for (name, path) in components {
1063            if !path.exists() {
1064                bail!("component artifact missing: {}", path.display());
1065            }
1066            let wasm_bytes = std::fs::read(&path)?;
1067            let component = Component::from_binary(&engine, &wasm_bytes)
1068                .with_context(|| format!("failed to compile component {}", path.display()))?;
1069            component_map.insert(
1070                name.clone(),
1071                PackComponent {
1072                    name,
1073                    version: "0.0.0".into(),
1074                    component,
1075                },
1076            );
1077        }
1078
1079        let descriptors = flows
1080            .iter()
1081            .map(|(id, ir)| FlowDescriptor {
1082                id: id.clone(),
1083                flow_type: ir.flow_type.clone(),
1084                profile: "test".into(),
1085                version: "0.0.0".into(),
1086                description: None,
1087            })
1088            .collect::<Vec<_>>();
1089        let flows_cache = PackFlows {
1090            descriptors: descriptors.clone(),
1091            flows,
1092            metadata: PackMetadata::fallback(Path::new("component-test")),
1093        };
1094
1095        Ok(Self {
1096            path: PathBuf::new(),
1097            archive_path: None,
1098            config,
1099            engine,
1100            metadata: PackMetadata::fallback(Path::new("component-test")),
1101            manifest: None,
1102            mocks: None,
1103            flows: Some(flows_cache),
1104            components: component_map,
1105            http_client: Arc::clone(&HTTP_CLIENT),
1106            pre_cache: Mutex::new(HashMap::new()),
1107            session_store: None,
1108            state_store: None,
1109            wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1110            secrets: crate::secrets::default_manager(),
1111            oauth_config: None,
1112        })
1113    }
1114}
1115
1116fn map_legacy_error(err: LegacyIfaceError) -> types::IfaceError {
1117    match err {
1118        LegacyIfaceError::InvalidArg => types::IfaceError::InvalidArg,
1119        LegacyIfaceError::NotFound => types::IfaceError::NotFound,
1120        LegacyIfaceError::Denied => types::IfaceError::Denied,
1121        LegacyIfaceError::Unavailable => types::IfaceError::Unavailable,
1122        LegacyIfaceError::Internal => types::IfaceError::Internal,
1123    }
1124}
1125
1126struct PackFlows {
1127    descriptors: Vec<FlowDescriptor>,
1128    flows: HashMap<String, FlowIR>,
1129    metadata: PackMetadata,
1130}
1131
1132impl PackFlows {
1133    fn from_archive(path: &Path) -> Result<Self> {
1134        let pack = open_pack(path, SigningPolicy::DevOk)
1135            .map_err(|err| anyhow!("failed to open pack {}: {}", path.display(), err.message))?;
1136        let file = File::open(path)
1137            .with_context(|| format!("failed to open archive {}", path.display()))?;
1138        let mut archive = ZipArchive::new(file)
1139            .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1140        let mut flows = HashMap::new();
1141        let mut descriptors = Vec::new();
1142        for flow in &pack.manifest.flows {
1143            match load_flow_entry(&mut archive, flow, &pack.manifest.meta) {
1144                Ok((descriptor, ir)) => {
1145                    flows.insert(descriptor.id.clone(), ir);
1146                    descriptors.push(descriptor);
1147                }
1148                Err(err) => {
1149                    warn!(
1150                        error = %err,
1151                        flow_id = flow.id,
1152                        "failed to parse flow metadata from archive; using stub flow"
1153                    );
1154                    let stub = build_stub_flow_ir(&flow.id, &flow.kind);
1155                    flows.insert(flow.id.clone(), stub.clone());
1156                    descriptors.push(FlowDescriptor {
1157                        id: flow.id.clone(),
1158                        flow_type: flow.kind.clone(),
1159                        profile: pack.manifest.meta.name.clone(),
1160                        version: pack.manifest.meta.version.to_string(),
1161                        description: Some(flow.kind.clone()),
1162                    });
1163                }
1164            }
1165        }
1166
1167        Ok(Self {
1168            metadata: PackMetadata::from_manifest(&pack.manifest),
1169            descriptors,
1170            flows,
1171        })
1172    }
1173}
1174
1175fn load_flow_entry(
1176    archive: &mut ZipArchive<File>,
1177    entry: &greentic_pack::builder::FlowEntry,
1178    meta: &greentic_pack::builder::PackMeta,
1179) -> Result<(FlowDescriptor, FlowIR)> {
1180    let doc = load_flow_doc(archive, entry)?;
1181    let ir = greentic_flow::to_ir(doc.clone())
1182        .with_context(|| format!("failed to build IR for flow {}", doc.id))?;
1183    let descriptor = FlowDescriptor {
1184        id: doc.id.clone(),
1185        flow_type: doc.flow_type.clone(),
1186        profile: meta.name.clone(),
1187        version: meta.version.to_string(),
1188        description: doc
1189            .description
1190            .clone()
1191            .or(doc.title.clone())
1192            .or_else(|| Some(entry.kind.clone())),
1193    };
1194    Ok((descriptor, ir))
1195}
1196
1197fn load_flow_doc(
1198    archive: &mut ZipArchive<File>,
1199    entry: &greentic_pack::builder::FlowEntry,
1200) -> Result<greentic_flow::model::FlowDoc> {
1201    if let Ok(bytes) = read_entry(archive, &entry.file_yaml)
1202        && let Ok(doc) = serde_yaml::from_slice(&bytes)
1203    {
1204        return Ok(normalize_flow_doc(doc));
1205    }
1206    let json_bytes = read_entry(archive, &entry.file_json)
1207        .with_context(|| format!("missing flow document {}", entry.file_json))?;
1208    let doc = serde_json::from_slice(&json_bytes)
1209        .with_context(|| format!("failed to parse {}", entry.file_json))?;
1210    Ok(normalize_flow_doc(doc))
1211}
1212
1213fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1214    let mut file = archive
1215        .by_name(name)
1216        .with_context(|| format!("entry {name} missing from archive"))?;
1217    let mut buf = Vec::new();
1218    file.read_to_end(&mut buf)?;
1219    Ok(buf)
1220}
1221
1222fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
1223    for node in doc.nodes.values_mut() {
1224        if node.component.is_empty()
1225            && let Some((component_ref, payload)) = node.raw.iter().next()
1226        {
1227            let (operation, input, config) = infer_component_exec(payload, component_ref);
1228            let mut payload_obj = serde_json::Map::new();
1229            payload_obj.insert("component".into(), Value::String(component_ref.clone()));
1230            payload_obj.insert("operation".into(), Value::String(operation));
1231            payload_obj.insert("input".into(), input);
1232            if let Some(cfg) = config {
1233                payload_obj.insert("config".into(), cfg);
1234            }
1235            node.component = "component.exec".to_string();
1236            node.payload = Value::Object(payload_obj);
1237        }
1238    }
1239    doc
1240}
1241
1242fn infer_component_exec(payload: &Value, component_ref: &str) -> (String, Value, Option<Value>) {
1243    let default_op = if component_ref.starts_with("templating.") {
1244        "render"
1245    } else {
1246        "invoke"
1247    }
1248    .to_string();
1249
1250    if let Value::Object(map) = payload {
1251        let op = map
1252            .get("op")
1253            .or_else(|| map.get("operation"))
1254            .and_then(Value::as_str)
1255            .map(|s| s.to_string())
1256            .unwrap_or_else(|| default_op.clone());
1257
1258        let mut input = map.clone();
1259        let config = input.remove("config");
1260        input.remove("op");
1261        input.remove("operation");
1262        return (op, Value::Object(input), config);
1263    }
1264
1265    (default_op, payload.clone(), None)
1266}
1267
1268#[cfg(test)]
1269mod tests {
1270    use super::*;
1271    use greentic_flow::model::{FlowDoc, Node, Route};
1272    use serde_json::json;
1273    use std::collections::BTreeMap;
1274
1275    #[test]
1276    fn normalizes_raw_component_to_component_exec() {
1277        let mut nodes = BTreeMap::new();
1278        let mut raw = BTreeMap::new();
1279        raw.insert(
1280            "templating.handlebars".into(),
1281            json!({ "template": "Hi {{name}}" }),
1282        );
1283        nodes.insert(
1284            "start".into(),
1285            Node {
1286                raw,
1287                routing: vec![Route {
1288                    to: None,
1289                    out: Some(true),
1290                }],
1291                ..Default::default()
1292            },
1293        );
1294        let doc = FlowDoc {
1295            id: "welcome".into(),
1296            title: None,
1297            description: None,
1298            flow_type: "messaging".into(),
1299            start: Some("start".into()),
1300            parameters: json!({}),
1301            nodes,
1302        };
1303
1304        let normalized = normalize_flow_doc(doc);
1305        let node = normalized.nodes.get("start").expect("node exists");
1306        assert_eq!(node.component, "component.exec");
1307        assert!(node.raw.is_empty() || node.raw.contains_key("templating.handlebars"));
1308        let payload = node.payload.as_object().expect("payload object");
1309        assert_eq!(
1310            payload.get("component"),
1311            Some(&Value::String("templating.handlebars".into()))
1312        );
1313        assert_eq!(
1314            payload.get("operation"),
1315            Some(&Value::String("render".into()))
1316        );
1317        let input = payload.get("input").unwrap();
1318        assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
1319    }
1320}
1321
1322fn load_components_from_archive(
1323    engine: &Engine,
1324    path: &Path,
1325) -> Result<HashMap<String, PackComponent>> {
1326    let pack = open_pack(path, SigningPolicy::DevOk)
1327        .map_err(|err| anyhow!("failed to open pack {}: {}", path.display(), err.message))?;
1328    let mut archive = ZipArchive::new(File::open(path)?)?;
1329    let mut components = HashMap::new();
1330    for entry in &pack.manifest.components {
1331        let bytes = read_entry(&mut archive, &entry.file_wasm)
1332            .with_context(|| format!("missing component {}", entry.file_wasm))?;
1333        let component = Component::from_binary(engine, &bytes)
1334            .with_context(|| format!("failed to compile component {}", entry.name))?;
1335        components.insert(
1336            entry.name.clone(),
1337            PackComponent {
1338                name: entry.name.clone(),
1339                version: entry.version.to_string(),
1340                component,
1341            },
1342        );
1343    }
1344    Ok(components)
1345}
1346
1347#[derive(Clone, Debug, Default, Serialize, Deserialize)]
1348pub struct PackMetadata {
1349    pub pack_id: String,
1350    pub version: String,
1351    #[serde(default)]
1352    pub entry_flows: Vec<String>,
1353}
1354
1355impl PackMetadata {
1356    fn from_wasm(bytes: &[u8]) -> Option<Self> {
1357        let parser = Parser::new(0);
1358        for payload in parser.parse_all(bytes) {
1359            let payload = payload.ok()?;
1360            match payload {
1361                Payload::CustomSection(section) => {
1362                    if section.name() == "greentic.manifest"
1363                        && let Ok(meta) = Self::from_bytes(section.data())
1364                    {
1365                        return Some(meta);
1366                    }
1367                }
1368                Payload::DataSection(reader) => {
1369                    for segment in reader.into_iter().flatten() {
1370                        if let Ok(meta) = Self::from_bytes(segment.data) {
1371                            return Some(meta);
1372                        }
1373                    }
1374                }
1375                _ => {}
1376            }
1377        }
1378        None
1379    }
1380
1381    fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
1382        #[derive(Deserialize)]
1383        struct RawManifest {
1384            pack_id: String,
1385            version: String,
1386            #[serde(default)]
1387            entry_flows: Vec<String>,
1388            #[serde(default)]
1389            flows: Vec<RawFlow>,
1390        }
1391
1392        #[derive(Deserialize)]
1393        struct RawFlow {
1394            id: String,
1395        }
1396
1397        let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
1398        let mut entry_flows = if manifest.entry_flows.is_empty() {
1399            manifest.flows.iter().map(|f| f.id.clone()).collect()
1400        } else {
1401            manifest.entry_flows.clone()
1402        };
1403        entry_flows.retain(|id| !id.is_empty());
1404        Ok(Self {
1405            pack_id: manifest.pack_id,
1406            version: manifest.version,
1407            entry_flows,
1408        })
1409    }
1410
1411    pub fn fallback(path: &Path) -> Self {
1412        let pack_id = path
1413            .file_stem()
1414            .map(|s| s.to_string_lossy().into_owned())
1415            .unwrap_or_else(|| "unknown-pack".to_string());
1416        Self {
1417            pack_id,
1418            version: "0.0.0".to_string(),
1419            entry_flows: Vec::new(),
1420        }
1421    }
1422
1423    pub fn from_manifest(manifest: &greentic_pack::builder::PackManifest) -> Self {
1424        let entry_flows = if manifest.meta.entry_flows.is_empty() {
1425            manifest
1426                .flows
1427                .iter()
1428                .map(|flow| flow.id.clone())
1429                .collect::<Vec<_>>()
1430        } else {
1431            manifest.meta.entry_flows.clone()
1432        };
1433        Self {
1434            pack_id: manifest.meta.pack_id.clone(),
1435            version: manifest.meta.version.to_string(),
1436            entry_flows,
1437        }
1438    }
1439}
1440
1441fn build_stub_flow_ir(flow_id: &str, flow_type: &str) -> FlowIR {
1442    let mut nodes = IndexMap::new();
1443    nodes.insert(
1444        "complete".into(),
1445        NodeIR {
1446            component: "component.exec".into(),
1447            payload_expr: serde_json::json!({
1448                "component": "qa.process",
1449                "operation": "process",
1450                "input": {
1451                    "status": "done",
1452                    "flow_id": flow_id,
1453                }
1454            }),
1455            routes: vec![RouteIR {
1456                to: None,
1457                out: true,
1458            }],
1459        },
1460    );
1461    FlowIR {
1462        id: flow_id.to_string(),
1463        flow_type: flow_type.to_string(),
1464        start: Some("complete".into()),
1465        parameters: Value::Object(Default::default()),
1466        nodes,
1467    }
1468}
1469
1470impl From<&HttpMockResponse> for LegacyHttpResponse {
1471    fn from(value: &HttpMockResponse) -> Self {
1472        let headers_json = serde_json::to_string(&value.headers).ok();
1473        Self {
1474            status: value.status,
1475            headers_json,
1476            body: value.body.clone(),
1477        }
1478    }
1479}