greentic_runner_host/
pack.rs

1use std::collections::{BTreeMap, HashMap};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7
8use crate::component_api::component::greentic::component::control::Host as ComponentControlHost;
9use crate::component_api::{
10    ComponentPre, control, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
11};
12use crate::imports::register_all;
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::runtime_wasmtime::{Component, Engine, Linker, ResourceTable, WasmResult};
15use anyhow::{Context, Result, anyhow, bail};
16use greentic_flow::ir::{FlowIR, NodeIR, RouteIR};
17use greentic_interfaces_host::host_import::v0_2 as host_import_v0_2;
18use greentic_interfaces_host::host_import::v0_2::greentic::host_import::imports::{
19    HttpRequest as LegacyHttpRequest, HttpResponse as LegacyHttpResponse,
20    IfaceError as LegacyIfaceError, TenantCtx as LegacyTenantCtx,
21};
22use greentic_interfaces_host::host_import::v0_6::{
23    self as host_import_v0_6, iface_types, state, types,
24};
25use greentic_pack::reader::{SigningPolicy, open_pack};
26use greentic_session::SessionKey as StoreSessionKey;
27use greentic_types::{
28    EnvId, FlowId, SessionCursor as StoreSessionCursor, SessionData, StateKey as StoreStateKey,
29    TeamId, TenantCtx as TypesTenantCtx, TenantId, UserId,
30};
31use indexmap::IndexMap;
32use once_cell::sync::Lazy;
33use parking_lot::Mutex;
34use reqwest::blocking::Client as BlockingClient;
35use serde::{Deserialize, Serialize};
36use serde_cbor;
37use serde_json::{self, Value};
38use serde_yaml_bw as serde_yaml;
39use tokio::fs;
40use wasmparser::{Parser, Payload};
41use wasmtime::StoreContextMut;
42use zip::ZipArchive;
43
44use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
45
46use crate::config::HostConfig;
47use crate::secrets::{DynSecretsManager, read_secret_blocking};
48use crate::storage::state::STATE_PREFIX;
49use crate::storage::{DynSessionStore, DynStateStore};
50use crate::verify;
51use crate::wasi::RunnerWasiPolicy;
52use tracing::warn;
53use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
54
55#[allow(dead_code)]
56pub struct PackRuntime {
57    path: PathBuf,
58    config: Arc<HostConfig>,
59    engine: Engine,
60    metadata: PackMetadata,
61    manifest: Option<greentic_pack::builder::PackManifest>,
62    mocks: Option<Arc<MockLayer>>,
63    flows: Option<PackFlows>,
64    components: HashMap<String, PackComponent>,
65    http_client: Arc<BlockingClient>,
66    pre_cache: Mutex<HashMap<String, ComponentPre<ComponentState>>>,
67    session_store: Option<DynSessionStore>,
68    state_store: Option<DynStateStore>,
69    wasi_policy: Arc<RunnerWasiPolicy>,
70    secrets: DynSecretsManager,
71    oauth_config: Option<OAuthBrokerConfig>,
72}
73
74struct PackComponent {
75    #[allow(dead_code)]
76    name: String,
77    #[allow(dead_code)]
78    version: String,
79    component: Component,
80}
81
82fn build_blocking_client() -> BlockingClient {
83    std::thread::spawn(|| BlockingClient::builder().build().expect("blocking client"))
84        .join()
85        .expect("client build thread panicked")
86}
87
88static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct FlowDescriptor {
92    pub id: String,
93    #[serde(rename = "type")]
94    pub flow_type: String,
95    pub profile: String,
96    pub version: String,
97    #[serde(default)]
98    pub description: Option<String>,
99}
100
101pub struct HostState {
102    config: Arc<HostConfig>,
103    http_client: Arc<BlockingClient>,
104    default_env: String,
105    session_store: Option<DynSessionStore>,
106    state_store: Option<DynStateStore>,
107    mocks: Option<Arc<MockLayer>>,
108    secrets: DynSecretsManager,
109    oauth_config: Option<OAuthBrokerConfig>,
110    oauth_host: OAuthBrokerHost,
111}
112
113impl HostState {
114    #[allow(clippy::default_constructed_unit_structs)]
115    pub fn new(
116        config: Arc<HostConfig>,
117        http_client: Arc<BlockingClient>,
118        mocks: Option<Arc<MockLayer>>,
119        session_store: Option<DynSessionStore>,
120        state_store: Option<DynStateStore>,
121        secrets: DynSecretsManager,
122        oauth_config: Option<OAuthBrokerConfig>,
123    ) -> Result<Self> {
124        let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
125        Ok(Self {
126            config,
127            http_client,
128            default_env,
129            session_store,
130            state_store,
131            mocks,
132            secrets,
133            oauth_config,
134            oauth_host: OAuthBrokerHost::default(),
135        })
136    }
137
138    pub fn get_secret(&self, key: &str) -> Result<String> {
139        if !self.config.secrets_policy.is_allowed(key) {
140            bail!("secret {key} is not permitted by bindings policy");
141        }
142        if let Some(mock) = &self.mocks
143            && let Some(value) = mock.secrets_lookup(key)
144        {
145            return Ok(value);
146        }
147        let bytes = read_secret_blocking(&self.secrets, key)
148            .context("failed to read secret from manager")?;
149        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
150        Ok(value)
151    }
152
153    fn tenant_ctx_from_v6(&self, ctx: Option<types::TenantCtx>) -> Result<TypesTenantCtx> {
154        let tenant_raw = ctx
155            .as_ref()
156            .map(|ctx| ctx.tenant.clone())
157            .unwrap_or_else(|| self.config.tenant.clone());
158        let tenant_id = TenantId::from_str(&tenant_raw)
159            .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
160        let env_id = EnvId::from_str(&self.default_env)
161            .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
162        let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
163        if let Some(ctx) = ctx {
164            if let Some(team) = ctx.team {
165                let team_id =
166                    TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
167                tenant_ctx = tenant_ctx.with_team(Some(team_id));
168            }
169            if let Some(user) = ctx.user {
170                let user_id =
171                    UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
172                tenant_ctx = tenant_ctx.with_user(Some(user_id));
173            }
174            if let Some(flow) = ctx.flow_id {
175                tenant_ctx = tenant_ctx.with_flow(flow);
176            }
177            if let Some(node) = ctx.node_id {
178                tenant_ctx = tenant_ctx.with_node(node);
179            }
180            if let Some(provider) = ctx.provider_id {
181                tenant_ctx = tenant_ctx.with_provider(provider);
182            }
183            if let Some(session) = ctx.session_id {
184                tenant_ctx = tenant_ctx.with_session(session);
185            }
186            tenant_ctx.trace_id = ctx.trace_id;
187        }
188        Ok(tenant_ctx)
189    }
190
191    fn session_store_handle(&self) -> Result<DynSessionStore, types::IfaceError> {
192        self.session_store
193            .as_ref()
194            .cloned()
195            .ok_or(types::IfaceError::Unavailable)
196    }
197
198    fn state_store_handle(&self) -> Result<DynStateStore, types::IfaceError> {
199        self.state_store
200            .as_ref()
201            .cloned()
202            .ok_or(types::IfaceError::Unavailable)
203    }
204
205    fn ensure_user(ctx: &TypesTenantCtx) -> Result<UserId, types::IfaceError> {
206        ctx.user_id
207            .clone()
208            .or_else(|| ctx.user.clone())
209            .ok_or(types::IfaceError::InvalidArg)
210    }
211
212    fn ensure_flow(ctx: &TypesTenantCtx) -> Result<FlowId, types::IfaceError> {
213        let flow = ctx.flow_id().ok_or(types::IfaceError::InvalidArg)?;
214        FlowId::from_str(flow).map_err(|_| types::IfaceError::InvalidArg)
215    }
216
217    fn cursor_from_iface(cursor: iface_types::SessionCursor) -> StoreSessionCursor {
218        let mut store_cursor = StoreSessionCursor::new(cursor.node_pointer);
219        if let Some(reason) = cursor.wait_reason {
220            store_cursor = store_cursor.with_wait_reason(reason);
221        }
222        if let Some(marker) = cursor.outbox_marker {
223            store_cursor = store_cursor.with_outbox_marker(marker);
224        }
225        store_cursor
226    }
227}
228
229pub struct ComponentState {
230    host: HostState,
231    wasi_ctx: WasiCtx,
232    resource_table: ResourceTable,
233}
234
235impl ComponentState {
236    pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
237        let wasi_ctx = policy
238            .instantiate()
239            .context("failed to build WASI context")?;
240        Ok(Self {
241            host,
242            wasi_ctx,
243            resource_table: ResourceTable::new(),
244        })
245    }
246
247    fn host_mut(&mut self) -> &mut HostState {
248        &mut self.host
249    }
250}
251
252impl control::Host for ComponentState {
253    fn should_cancel(&mut self) -> bool {
254        false
255    }
256
257    fn yield_now(&mut self) {
258        // no-op cooperative yield
259    }
260}
261
262fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
263    let mut inst = linker.instance("greentic:component/control@0.4.0")?;
264    inst.func_wrap(
265        "should-cancel",
266        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
267            let host = caller.data_mut();
268            Ok((ComponentControlHost::should_cancel(host),))
269        },
270    )?;
271    inst.func_wrap(
272        "yield-now",
273        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
274            let host = caller.data_mut();
275            ComponentControlHost::yield_now(host);
276            Ok(())
277        },
278    )?;
279    Ok(())
280}
281
282impl OAuthHostContext for ComponentState {
283    fn tenant_id(&self) -> &str {
284        &self.host.config.tenant
285    }
286
287    fn env(&self) -> &str {
288        &self.host.default_env
289    }
290
291    fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
292        &mut self.host.oauth_host
293    }
294
295    fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
296        self.host.oauth_config.as_ref()
297    }
298}
299
300impl WasiView for ComponentState {
301    fn ctx(&mut self) -> WasiCtxView<'_> {
302        WasiCtxView {
303            ctx: &mut self.wasi_ctx,
304            table: &mut self.resource_table,
305        }
306    }
307}
308
309#[allow(unsafe_code)]
310unsafe impl Send for ComponentState {}
311#[allow(unsafe_code)]
312unsafe impl Sync for ComponentState {}
313
314impl host_import_v0_6::HostImports for ComponentState {
315    fn secrets_get(
316        &mut self,
317        key: String,
318        ctx: Option<types::TenantCtx>,
319    ) -> WasmResult<Result<String, types::IfaceError>> {
320        host_import_v0_6::HostImports::secrets_get(self.host_mut(), key, ctx)
321    }
322
323    fn telemetry_emit(
324        &mut self,
325        span_json: String,
326        ctx: Option<types::TenantCtx>,
327    ) -> WasmResult<()> {
328        host_import_v0_6::HostImports::telemetry_emit(self.host_mut(), span_json, ctx)
329    }
330
331    fn http_fetch(
332        &mut self,
333        req: host_import_v0_6::http::HttpRequest,
334        ctx: Option<types::TenantCtx>,
335    ) -> WasmResult<Result<host_import_v0_6::http::HttpResponse, types::IfaceError>> {
336        host_import_v0_6::HostImports::http_fetch(self.host_mut(), req, ctx)
337    }
338
339    fn mcp_exec(
340        &mut self,
341        component: String,
342        action: String,
343        args_json: String,
344        ctx: Option<types::TenantCtx>,
345    ) -> WasmResult<Result<String, types::IfaceError>> {
346        host_import_v0_6::HostImports::mcp_exec(self.host_mut(), component, action, args_json, ctx)
347    }
348
349    fn state_get(
350        &mut self,
351        key: iface_types::StateKey,
352        ctx: Option<types::TenantCtx>,
353    ) -> WasmResult<Result<String, types::IfaceError>> {
354        host_import_v0_6::HostImports::state_get(self.host_mut(), key, ctx)
355    }
356
357    fn state_set(
358        &mut self,
359        key: iface_types::StateKey,
360        value_json: String,
361        ctx: Option<types::TenantCtx>,
362    ) -> WasmResult<Result<state::OpAck, types::IfaceError>> {
363        host_import_v0_6::HostImports::state_set(self.host_mut(), key, value_json, ctx)
364    }
365
366    fn session_update(
367        &mut self,
368        cursor: iface_types::SessionCursor,
369        ctx: Option<types::TenantCtx>,
370    ) -> WasmResult<Result<String, types::IfaceError>> {
371        host_import_v0_6::HostImports::session_update(self.host_mut(), cursor, ctx)
372    }
373}
374
375impl host_import_v0_2::HostImports for ComponentState {
376    fn secrets_get(
377        &mut self,
378        key: String,
379        ctx: Option<LegacyTenantCtx>,
380    ) -> WasmResult<Result<String, LegacyIfaceError>> {
381        host_import_v0_2::HostImports::secrets_get(self.host_mut(), key, ctx)
382    }
383
384    fn telemetry_emit(
385        &mut self,
386        span_json: String,
387        ctx: Option<LegacyTenantCtx>,
388    ) -> WasmResult<()> {
389        host_import_v0_2::HostImports::telemetry_emit(self.host_mut(), span_json, ctx)
390    }
391
392    fn tool_invoke(
393        &mut self,
394        tool: String,
395        action: String,
396        args_json: String,
397        ctx: Option<LegacyTenantCtx>,
398    ) -> WasmResult<Result<String, LegacyIfaceError>> {
399        host_import_v0_2::HostImports::tool_invoke(self.host_mut(), tool, action, args_json, ctx)
400    }
401
402    fn http_fetch(
403        &mut self,
404        req: host_import_v0_2::greentic::host_import::imports::HttpRequest,
405        ctx: Option<host_import_v0_2::greentic::host_import::imports::TenantCtx>,
406    ) -> WasmResult<
407        Result<
408            host_import_v0_2::greentic::host_import::imports::HttpResponse,
409            host_import_v0_2::greentic::host_import::imports::IfaceError,
410        >,
411    > {
412        host_import_v0_2::HostImports::http_fetch(self.host_mut(), req, ctx)
413    }
414}
415
416impl host_import_v0_6::HostImports for HostState {
417    fn secrets_get(
418        &mut self,
419        key: String,
420        _ctx: Option<types::TenantCtx>,
421    ) -> WasmResult<Result<String, types::IfaceError>> {
422        Ok(self.get_secret(&key).map_err(|err| {
423            tracing::warn!(secret = %key, error = %err, "secret lookup denied");
424            types::IfaceError::Denied
425        }))
426    }
427
428    fn telemetry_emit(
429        &mut self,
430        span_json: String,
431        _ctx: Option<types::TenantCtx>,
432    ) -> WasmResult<()> {
433        if let Some(mock) = &self.mocks
434            && mock.telemetry_drain(&[("span_json", span_json.as_str())])
435        {
436            return Ok(());
437        }
438        tracing::info!(span = %span_json, "telemetry emit from pack");
439        Ok(())
440    }
441
442    fn http_fetch(
443        &mut self,
444        req: host_import_v0_6::http::HttpRequest,
445        _ctx: Option<types::TenantCtx>,
446    ) -> WasmResult<Result<host_import_v0_6::http::HttpResponse, types::IfaceError>> {
447        let legacy_req = LegacyHttpRequest {
448            method: req.method,
449            url: req.url,
450            headers_json: req.headers_json,
451            body: req.body,
452        };
453        match host_import_v0_2::HostImports::http_fetch(self, legacy_req, None)? {
454            Ok(resp) => Ok(Ok(host_import_v0_6::http::HttpResponse {
455                status: resp.status,
456                headers_json: resp.headers_json,
457                body: resp.body,
458            })),
459            Err(err) => Ok(Err(map_legacy_error(err))),
460        }
461    }
462
463    fn mcp_exec(
464        &mut self,
465        component: String,
466        action: String,
467        args_json: String,
468        ctx: Option<types::TenantCtx>,
469    ) -> WasmResult<Result<String, types::IfaceError>> {
470        let _ = (component, action, args_json, ctx);
471        tracing::warn!("mcp.exec requested but MCP bridge is removed; returning unavailable");
472        Ok(Err(types::IfaceError::Unavailable))
473    }
474
475    fn state_get(
476        &mut self,
477        key: iface_types::StateKey,
478        ctx: Option<types::TenantCtx>,
479    ) -> WasmResult<Result<String, types::IfaceError>> {
480        let store = match self.state_store_handle() {
481            Ok(store) => store,
482            Err(err) => return Ok(Err(err)),
483        };
484        let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
485            Ok(ctx) => ctx,
486            Err(err) => {
487                tracing::warn!(error = %err, "invalid tenant context for state.get");
488                return Ok(Err(types::IfaceError::InvalidArg));
489            }
490        };
491        let key = StoreStateKey::from(key);
492        match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
493            Ok(Some(value)) => {
494                let result = if let Some(text) = value.as_str() {
495                    text.to_string()
496                } else {
497                    serde_json::to_string(&value).unwrap_or_else(|_| value.to_string())
498                };
499                Ok(Ok(result))
500            }
501            Ok(None) => Ok(Err(types::IfaceError::NotFound)),
502            Err(err) => {
503                tracing::warn!(error = %err, "state.get failed");
504                Ok(Err(types::IfaceError::Internal))
505            }
506        }
507    }
508
509    fn state_set(
510        &mut self,
511        key: iface_types::StateKey,
512        value_json: String,
513        ctx: Option<types::TenantCtx>,
514    ) -> WasmResult<Result<state::OpAck, types::IfaceError>> {
515        let store = match self.state_store_handle() {
516            Ok(store) => store,
517            Err(err) => return Ok(Err(err)),
518        };
519        let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
520            Ok(ctx) => ctx,
521            Err(err) => {
522                tracing::warn!(error = %err, "invalid tenant context for state.set");
523                return Ok(Err(types::IfaceError::InvalidArg));
524            }
525        };
526        let key = StoreStateKey::from(key);
527        let value = serde_json::from_str(&value_json).unwrap_or(Value::String(value_json));
528        match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
529            Ok(()) => Ok(Ok(state::OpAck::Ok)),
530            Err(err) => {
531                tracing::warn!(error = %err, "state.set failed");
532                Ok(Err(types::IfaceError::Internal))
533            }
534        }
535    }
536
537    fn session_update(
538        &mut self,
539        cursor: iface_types::SessionCursor,
540        ctx: Option<types::TenantCtx>,
541    ) -> WasmResult<Result<String, types::IfaceError>> {
542        let store = match self.session_store_handle() {
543            Ok(store) => store,
544            Err(err) => return Ok(Err(err)),
545        };
546        let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
547            Ok(ctx) => ctx,
548            Err(err) => {
549                tracing::warn!(error = %err, "invalid tenant context for session.update");
550                return Ok(Err(types::IfaceError::InvalidArg));
551            }
552        };
553        let user = match Self::ensure_user(&tenant_ctx) {
554            Ok(user) => user,
555            Err(err) => return Ok(Err(err)),
556        };
557        let flow_id = match Self::ensure_flow(&tenant_ctx) {
558            Ok(flow) => flow,
559            Err(err) => return Ok(Err(err)),
560        };
561        let cursor = Self::cursor_from_iface(cursor);
562        let payload = SessionData {
563            tenant_ctx: tenant_ctx.clone(),
564            flow_id,
565            cursor: cursor.clone(),
566            context_json: serde_json::json!({
567                "node_pointer": cursor.node_pointer,
568                "wait_reason": cursor.wait_reason,
569                "outbox_marker": cursor.outbox_marker,
570            })
571            .to_string(),
572        };
573        if let Some(existing) = tenant_ctx.session_id() {
574            let key = StoreSessionKey::from(existing.to_string());
575            if let Err(err) = store.update_session(&key, payload) {
576                tracing::error!(error = %err, "failed to update session snapshot");
577                return Ok(Err(types::IfaceError::Internal));
578            }
579            return Ok(Ok(existing.to_string()));
580        }
581        match store.find_by_user(&tenant_ctx, &user) {
582            Ok(Some((key, _))) => {
583                if let Err(err) = store.update_session(&key, payload) {
584                    tracing::error!(error = %err, "failed to update existing user session");
585                    return Ok(Err(types::IfaceError::Internal));
586                }
587                return Ok(Ok(key.to_string()));
588            }
589            Ok(None) => {}
590            Err(err) => {
591                tracing::error!(error = %err, "session lookup failed");
592                return Ok(Err(types::IfaceError::Internal));
593            }
594        }
595        let key = match store.create_session(&tenant_ctx, payload.clone()) {
596            Ok(key) => key,
597            Err(err) => {
598                tracing::error!(error = %err, "failed to create session");
599                return Ok(Err(types::IfaceError::Internal));
600            }
601        };
602        let ctx_with_session = tenant_ctx.with_session(key.to_string());
603        let updated_payload = SessionData {
604            tenant_ctx: ctx_with_session.clone(),
605            ..payload
606        };
607        if let Err(err) = store.update_session(&key, updated_payload) {
608            tracing::warn!(error = %err, "failed to stamp session id after create");
609        }
610        Ok(Ok(key.to_string()))
611    }
612}
613
614impl host_import_v0_2::HostImports for HostState {
615    fn secrets_get(
616        &mut self,
617        key: String,
618        _ctx: Option<LegacyTenantCtx>,
619    ) -> WasmResult<Result<String, LegacyIfaceError>> {
620        Ok(self.get_secret(&key).map_err(|err| {
621            tracing::warn!(secret = %key, error = %err, "secret lookup denied");
622            LegacyIfaceError::Denied
623        }))
624    }
625
626    fn telemetry_emit(
627        &mut self,
628        span_json: String,
629        _ctx: Option<LegacyTenantCtx>,
630    ) -> WasmResult<()> {
631        if let Some(mock) = &self.mocks
632            && mock.telemetry_drain(&[("span_json", span_json.as_str())])
633        {
634            return Ok(());
635        }
636        tracing::info!(span = %span_json, "telemetry emit from pack");
637        Ok(())
638    }
639
640    fn tool_invoke(
641        &mut self,
642        tool: String,
643        action: String,
644        args_json: String,
645        ctx: Option<LegacyTenantCtx>,
646    ) -> WasmResult<Result<String, LegacyIfaceError>> {
647        let _ = (tool, action, args_json, ctx);
648        tracing::warn!("tool invoke requested but MCP bridge is removed; returning unavailable");
649        Ok(Err(LegacyIfaceError::Unavailable))
650    }
651
652    fn http_fetch(
653        &mut self,
654        req: LegacyHttpRequest,
655        _ctx: Option<LegacyTenantCtx>,
656    ) -> WasmResult<Result<LegacyHttpResponse, LegacyIfaceError>> {
657        if !self.config.http_enabled {
658            tracing::warn!(url = %req.url, "http fetch denied by policy");
659            return Ok(Err(LegacyIfaceError::Denied));
660        }
661
662        let mut mock_state = None;
663        let raw_body = req.body.clone();
664        if let Some(mock) = &self.mocks
665            && let Ok(meta) = HttpMockRequest::new(
666                &req.method,
667                &req.url,
668                raw_body.as_deref().map(|body| body.as_bytes()),
669            )
670        {
671            match mock.http_begin(&meta) {
672                HttpDecision::Mock(response) => {
673                    let http = LegacyHttpResponse::from(&response);
674                    return Ok(Ok(http));
675                }
676                HttpDecision::Deny(reason) => {
677                    tracing::warn!(url = %req.url, reason = %reason, "http fetch blocked by mocks");
678                    return Ok(Err(LegacyIfaceError::Denied));
679                }
680                HttpDecision::Passthrough { record } => {
681                    mock_state = Some((meta, record));
682                }
683            }
684        }
685
686        let method = req.method.parse().unwrap_or(reqwest::Method::GET);
687        let mut builder = self.http_client.request(method, &req.url);
688
689        if let Some(headers_json) = req.headers_json.as_ref() {
690            match serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(headers_json) {
691                Ok(map) => {
692                    for (key, value) in map {
693                        if let Some(val) = value.as_str()
694                            && let Ok(header) =
695                                reqwest::header::HeaderName::from_bytes(key.as_bytes())
696                            && let Ok(header_value) = reqwest::header::HeaderValue::from_str(val)
697                        {
698                            builder = builder.header(header, header_value);
699                        }
700                    }
701                }
702                Err(err) => {
703                    tracing::warn!(error = %err, "failed to parse headers for http.fetch");
704                }
705            }
706        }
707
708        if let Some(body) = raw_body.clone() {
709            builder = builder.body(body);
710        }
711
712        let response = match builder.send() {
713            Ok(resp) => resp,
714            Err(err) => {
715                tracing::error!(url = %req.url, error = %err, "http fetch failed");
716                return Ok(Err(LegacyIfaceError::Unavailable));
717            }
718        };
719
720        let status = response.status().as_u16();
721        let headers_map = response
722            .headers()
723            .iter()
724            .map(|(k, v)| {
725                (
726                    k.as_str().to_string(),
727                    v.to_str().unwrap_or_default().to_string(),
728                )
729            })
730            .collect::<BTreeMap<_, _>>();
731        let headers_json = serde_json::to_string(&headers_map).ok();
732        let body = response.text().ok();
733
734        if let Some((meta, true)) = mock_state.take()
735            && let Some(mock) = &self.mocks
736        {
737            let recorded = HttpMockResponse::new(status, headers_map.clone(), body.clone());
738            mock.http_record(&meta, &recorded);
739        }
740
741        Ok(Ok(LegacyHttpResponse {
742            status,
743            headers_json,
744            body,
745        }))
746    }
747}
748
749impl PackRuntime {
750    #[allow(clippy::too_many_arguments)]
751    pub async fn load(
752        path: impl AsRef<Path>,
753        config: Arc<HostConfig>,
754        mocks: Option<Arc<MockLayer>>,
755        archive_source: Option<&Path>,
756        session_store: Option<DynSessionStore>,
757        state_store: Option<DynStateStore>,
758        wasi_policy: Arc<RunnerWasiPolicy>,
759        secrets: DynSecretsManager,
760        oauth_config: Option<OAuthBrokerConfig>,
761        verify_archive: bool,
762    ) -> Result<Self> {
763        let path = path.as_ref();
764        let is_component = path
765            .extension()
766            .and_then(|ext| ext.to_str())
767            .map(|ext| ext.eq_ignore_ascii_case("wasm"))
768            .unwrap_or(false);
769        let archive_hint = archive_source.or(if is_component { None } else { Some(path) });
770        if verify_archive {
771            let verify_target = archive_hint.unwrap_or(path);
772            verify::verify_pack(verify_target).await?;
773            tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
774        }
775        let engine = Engine::default();
776        let wasm_bytes = fs::read(path).await?;
777        let mut metadata =
778            PackMetadata::from_wasm(&wasm_bytes).unwrap_or_else(|| PackMetadata::fallback(path));
779        let mut manifest = None;
780        let flows = if let Some(archive_path) = archive_hint {
781            match open_pack(archive_path, SigningPolicy::DevOk) {
782                Ok(pack) => {
783                    metadata = PackMetadata::from_manifest(&pack.manifest);
784                    manifest = Some(pack.manifest);
785                    None
786                }
787                Err(err) => {
788                    warn!(error = ?err, pack = %archive_path.display(), "failed to parse pack manifest; falling back to legacy flow cache");
789                    match PackFlows::from_archive(archive_path) {
790                        Ok(cache) => {
791                            metadata = cache.metadata.clone();
792                            Some(cache)
793                        }
794                        Err(err) => {
795                            warn!(
796                                error = %err,
797                                pack = %archive_path.display(),
798                                "failed to parse pack flows via greentic-pack; falling back to component exports"
799                            );
800                            None
801                        }
802                    }
803                }
804            }
805        } else {
806            None
807        };
808        let components = if let Some(archive_path) = archive_hint {
809            match load_components_from_archive(&engine, archive_path) {
810                Ok(map) => map,
811                Err(err) => {
812                    warn!(error = %err, pack = %archive_path.display(), "failed to load components from archive");
813                    HashMap::new()
814                }
815            }
816        } else if is_component {
817            let name = path
818                .file_stem()
819                .map(|s| s.to_string_lossy().to_string())
820                .unwrap_or_else(|| "component".to_string());
821            let component = Component::from_binary(&engine, &wasm_bytes)?;
822            let mut map = HashMap::new();
823            map.insert(
824                name.clone(),
825                PackComponent {
826                    name,
827                    version: metadata.version.clone(),
828                    component,
829                },
830            );
831            map
832        } else {
833            HashMap::new()
834        };
835        let http_client = Arc::clone(&HTTP_CLIENT);
836        Ok(Self {
837            path: path.to_path_buf(),
838            config,
839            engine,
840            metadata,
841            manifest,
842            mocks,
843            flows,
844            components,
845            http_client,
846            pre_cache: Mutex::new(HashMap::new()),
847            session_store,
848            state_store,
849            wasi_policy,
850            secrets,
851            oauth_config,
852        })
853    }
854
855    pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
856        if let Some(cache) = &self.flows {
857            return Ok(cache.descriptors.clone());
858        }
859        if let Some(manifest) = &self.manifest {
860            let descriptors = manifest
861                .flows
862                .iter()
863                .map(|flow| FlowDescriptor {
864                    id: flow.id.clone(),
865                    flow_type: flow.kind.clone(),
866                    profile: manifest.meta.name.clone(),
867                    version: manifest.meta.version.to_string(),
868                    description: Some(flow.kind.clone()),
869                })
870                .collect();
871            return Ok(descriptors);
872        }
873        Ok(Vec::new())
874    }
875
876    #[allow(dead_code)]
877    pub async fn run_flow(
878        &self,
879        _flow_id: &str,
880        _input: serde_json::Value,
881    ) -> Result<serde_json::Value> {
882        // TODO: dispatch flow execution via Wasmtime
883        Ok(serde_json::json!({}))
884    }
885
886    pub async fn invoke_component(
887        &self,
888        component_ref: &str,
889        ctx: ComponentExecCtx,
890        operation: &str,
891        _config_json: Option<String>,
892        input_json: String,
893    ) -> Result<Value> {
894        let pack_component = self
895            .components
896            .get(component_ref)
897            .with_context(|| format!("component '{component_ref}' not found in pack"))?;
898
899        let pre = if let Some(pre) = self.pre_cache.lock().get(component_ref).cloned() {
900            pre
901        } else {
902            let mut linker = Linker::new(&self.engine);
903            register_all(&mut linker, self.oauth_config.is_some())?;
904            add_component_control_to_linker(&mut linker)?;
905            let pre = ComponentPre::new(
906                linker
907                    .instantiate_pre(&pack_component.component)
908                    .map_err(|err| anyhow!(err))?,
909            )
910            .map_err(|err| anyhow!(err))?;
911            self.pre_cache
912                .lock()
913                .insert(component_ref.to_string(), pre.clone());
914            pre
915        };
916
917        let host_state = HostState::new(
918            Arc::clone(&self.config),
919            Arc::clone(&self.http_client),
920            self.mocks.clone(),
921            self.session_store.clone(),
922            self.state_store.clone(),
923            Arc::clone(&self.secrets),
924            self.oauth_config.clone(),
925        )?;
926        let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
927        let mut store = wasmtime::Store::new(&self.engine, store_state);
928        let bindings = pre
929            .instantiate_async(&mut store)
930            .await
931            .map_err(|err| anyhow!(err))?;
932        let node = bindings.greentic_component_node();
933
934        let result = node.call_invoke(&mut store, &ctx, operation, &input_json)?;
935
936        match result {
937            InvokeResult::Ok(body) => {
938                if body.is_empty() {
939                    return Ok(Value::Null);
940                }
941                serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
942            }
943            InvokeResult::Err(NodeError {
944                code,
945                message,
946                retryable,
947                backoff_ms,
948                details,
949            }) => {
950                let mut obj = serde_json::Map::new();
951                obj.insert("ok".into(), Value::Bool(false));
952                let mut error = serde_json::Map::new();
953                error.insert("code".into(), Value::String(code));
954                error.insert("message".into(), Value::String(message));
955                error.insert("retryable".into(), Value::Bool(retryable));
956                if let Some(backoff) = backoff_ms {
957                    error.insert("backoff_ms".into(), Value::Number(backoff.into()));
958                }
959                if let Some(details) = details {
960                    error.insert(
961                        "details".into(),
962                        serde_json::from_str(&details).unwrap_or(Value::String(details)),
963                    );
964                }
965                obj.insert("error".into(), Value::Object(error));
966                Ok(Value::Object(obj))
967            }
968        }
969    }
970
971    pub fn load_flow_ir(&self, flow_id: &str) -> Result<greentic_flow::ir::FlowIR> {
972        if let Some(cache) = &self.flows {
973            return cache
974                .flows
975                .get(flow_id)
976                .cloned()
977                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
978        }
979        if let Some(manifest) = &self.manifest {
980            let entry = manifest
981                .flows
982                .iter()
983                .find(|f| f.id == flow_id)
984                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
985            let path = &self.path;
986            let mut archive = ZipArchive::new(File::open(path)?)?;
987            match load_flow_doc(&mut archive, entry).and_then(|doc| {
988                greentic_flow::to_ir(doc.clone())
989                    .with_context(|| format!("failed to build IR for flow {}", doc.id))
990            }) {
991                Ok(ir) => return Ok(ir),
992                Err(err) => {
993                    warn!(
994                        error = %err,
995                        flow_id = flow_id,
996                        "failed to parse flow from archive; falling back to stub IR"
997                    );
998                    return Ok(build_stub_flow_ir(&entry.id, &entry.kind));
999                }
1000            }
1001        }
1002        bail!("flow '{flow_id}' not available (pack exports disabled)")
1003    }
1004
1005    pub fn metadata(&self) -> &PackMetadata {
1006        &self.metadata
1007    }
1008
1009    pub fn for_component_test(
1010        components: Vec<(String, PathBuf)>,
1011        flows: HashMap<String, FlowIR>,
1012        config: Arc<HostConfig>,
1013    ) -> Result<Self> {
1014        let engine = Engine::default();
1015        let mut component_map = HashMap::new();
1016        for (name, path) in components {
1017            if !path.exists() {
1018                bail!("component artifact missing: {}", path.display());
1019            }
1020            let wasm_bytes = std::fs::read(&path)?;
1021            let component = Component::from_binary(&engine, &wasm_bytes)
1022                .with_context(|| format!("failed to compile component {}", path.display()))?;
1023            component_map.insert(
1024                name.clone(),
1025                PackComponent {
1026                    name,
1027                    version: "0.0.0".into(),
1028                    component,
1029                },
1030            );
1031        }
1032
1033        let descriptors = flows
1034            .iter()
1035            .map(|(id, ir)| FlowDescriptor {
1036                id: id.clone(),
1037                flow_type: ir.flow_type.clone(),
1038                profile: "test".into(),
1039                version: "0.0.0".into(),
1040                description: None,
1041            })
1042            .collect::<Vec<_>>();
1043        let flows_cache = PackFlows {
1044            descriptors: descriptors.clone(),
1045            flows,
1046            metadata: PackMetadata::fallback(Path::new("component-test")),
1047        };
1048
1049        Ok(Self {
1050            path: PathBuf::new(),
1051            config,
1052            engine,
1053            metadata: PackMetadata::fallback(Path::new("component-test")),
1054            manifest: None,
1055            mocks: None,
1056            flows: Some(flows_cache),
1057            components: component_map,
1058            http_client: Arc::clone(&HTTP_CLIENT),
1059            pre_cache: Mutex::new(HashMap::new()),
1060            session_store: None,
1061            state_store: None,
1062            wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1063            secrets: crate::secrets::default_manager(),
1064            oauth_config: None,
1065        })
1066    }
1067}
1068
1069fn map_legacy_error(err: LegacyIfaceError) -> types::IfaceError {
1070    match err {
1071        LegacyIfaceError::InvalidArg => types::IfaceError::InvalidArg,
1072        LegacyIfaceError::NotFound => types::IfaceError::NotFound,
1073        LegacyIfaceError::Denied => types::IfaceError::Denied,
1074        LegacyIfaceError::Unavailable => types::IfaceError::Unavailable,
1075        LegacyIfaceError::Internal => types::IfaceError::Internal,
1076    }
1077}
1078
1079struct PackFlows {
1080    descriptors: Vec<FlowDescriptor>,
1081    flows: HashMap<String, FlowIR>,
1082    metadata: PackMetadata,
1083}
1084
1085impl PackFlows {
1086    fn from_archive(path: &Path) -> Result<Self> {
1087        let pack = open_pack(path, SigningPolicy::DevOk)
1088            .map_err(|err| anyhow!("failed to open pack {}: {}", path.display(), err.message))?;
1089        let file = File::open(path)
1090            .with_context(|| format!("failed to open archive {}", path.display()))?;
1091        let mut archive = ZipArchive::new(file)
1092            .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1093        let mut flows = HashMap::new();
1094        let mut descriptors = Vec::new();
1095        for flow in &pack.manifest.flows {
1096            match load_flow_entry(&mut archive, flow, &pack.manifest.meta) {
1097                Ok((descriptor, ir)) => {
1098                    flows.insert(descriptor.id.clone(), ir);
1099                    descriptors.push(descriptor);
1100                }
1101                Err(err) => {
1102                    warn!(
1103                        error = %err,
1104                        flow_id = flow.id,
1105                        "failed to parse flow metadata from archive; using stub flow"
1106                    );
1107                    let stub = build_stub_flow_ir(&flow.id, &flow.kind);
1108                    flows.insert(flow.id.clone(), stub.clone());
1109                    descriptors.push(FlowDescriptor {
1110                        id: flow.id.clone(),
1111                        flow_type: flow.kind.clone(),
1112                        profile: pack.manifest.meta.name.clone(),
1113                        version: pack.manifest.meta.version.to_string(),
1114                        description: Some(flow.kind.clone()),
1115                    });
1116                }
1117            }
1118        }
1119
1120        Ok(Self {
1121            metadata: PackMetadata::from_manifest(&pack.manifest),
1122            descriptors,
1123            flows,
1124        })
1125    }
1126}
1127
1128fn load_flow_entry(
1129    archive: &mut ZipArchive<File>,
1130    entry: &greentic_pack::builder::FlowEntry,
1131    meta: &greentic_pack::builder::PackMeta,
1132) -> Result<(FlowDescriptor, FlowIR)> {
1133    let doc = load_flow_doc(archive, entry)?;
1134    let ir = greentic_flow::to_ir(doc.clone())
1135        .with_context(|| format!("failed to build IR for flow {}", doc.id))?;
1136    let descriptor = FlowDescriptor {
1137        id: doc.id.clone(),
1138        flow_type: doc.flow_type.clone(),
1139        profile: meta.name.clone(),
1140        version: meta.version.to_string(),
1141        description: doc
1142            .description
1143            .clone()
1144            .or(doc.title.clone())
1145            .or_else(|| Some(entry.kind.clone())),
1146    };
1147    Ok((descriptor, ir))
1148}
1149
1150fn load_flow_doc(
1151    archive: &mut ZipArchive<File>,
1152    entry: &greentic_pack::builder::FlowEntry,
1153) -> Result<greentic_flow::model::FlowDoc> {
1154    if let Ok(bytes) = read_entry(archive, &entry.file_yaml)
1155        && let Ok(doc) = serde_yaml::from_slice(&bytes)
1156    {
1157        return Ok(doc);
1158    }
1159    let json_bytes = read_entry(archive, &entry.file_json)
1160        .with_context(|| format!("missing flow document {}", entry.file_json))?;
1161    let doc = serde_json::from_slice(&json_bytes)
1162        .with_context(|| format!("failed to parse {}", entry.file_json))?;
1163    Ok(doc)
1164}
1165
1166fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1167    let mut file = archive
1168        .by_name(name)
1169        .with_context(|| format!("entry {name} missing from archive"))?;
1170    let mut buf = Vec::new();
1171    file.read_to_end(&mut buf)?;
1172    Ok(buf)
1173}
1174
1175fn load_components_from_archive(
1176    engine: &Engine,
1177    path: &Path,
1178) -> Result<HashMap<String, PackComponent>> {
1179    let pack = open_pack(path, SigningPolicy::DevOk)
1180        .map_err(|err| anyhow!("failed to open pack {}: {}", path.display(), err.message))?;
1181    let mut archive = ZipArchive::new(File::open(path)?)?;
1182    let mut components = HashMap::new();
1183    for entry in &pack.manifest.components {
1184        let bytes = read_entry(&mut archive, &entry.file_wasm)
1185            .with_context(|| format!("missing component {}", entry.file_wasm))?;
1186        let component = Component::from_binary(engine, &bytes)
1187            .with_context(|| format!("failed to compile component {}", entry.name))?;
1188        components.insert(
1189            entry.name.clone(),
1190            PackComponent {
1191                name: entry.name.clone(),
1192                version: entry.version.to_string(),
1193                component,
1194            },
1195        );
1196    }
1197    Ok(components)
1198}
1199
1200#[derive(Clone, Debug, Default, Serialize, Deserialize)]
1201pub struct PackMetadata {
1202    pub pack_id: String,
1203    pub version: String,
1204    #[serde(default)]
1205    pub entry_flows: Vec<String>,
1206}
1207
1208impl PackMetadata {
1209    fn from_wasm(bytes: &[u8]) -> Option<Self> {
1210        let parser = Parser::new(0);
1211        for payload in parser.parse_all(bytes) {
1212            let payload = payload.ok()?;
1213            match payload {
1214                Payload::CustomSection(section) => {
1215                    if section.name() == "greentic.manifest"
1216                        && let Ok(meta) = Self::from_bytes(section.data())
1217                    {
1218                        return Some(meta);
1219                    }
1220                }
1221                Payload::DataSection(reader) => {
1222                    for segment in reader.into_iter().flatten() {
1223                        if let Ok(meta) = Self::from_bytes(segment.data) {
1224                            return Some(meta);
1225                        }
1226                    }
1227                }
1228                _ => {}
1229            }
1230        }
1231        None
1232    }
1233
1234    fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
1235        #[derive(Deserialize)]
1236        struct RawManifest {
1237            pack_id: String,
1238            version: String,
1239            #[serde(default)]
1240            entry_flows: Vec<String>,
1241            #[serde(default)]
1242            flows: Vec<RawFlow>,
1243        }
1244
1245        #[derive(Deserialize)]
1246        struct RawFlow {
1247            id: String,
1248        }
1249
1250        let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
1251        let mut entry_flows = if manifest.entry_flows.is_empty() {
1252            manifest.flows.iter().map(|f| f.id.clone()).collect()
1253        } else {
1254            manifest.entry_flows.clone()
1255        };
1256        entry_flows.retain(|id| !id.is_empty());
1257        Ok(Self {
1258            pack_id: manifest.pack_id,
1259            version: manifest.version,
1260            entry_flows,
1261        })
1262    }
1263
1264    pub fn fallback(path: &Path) -> Self {
1265        let pack_id = path
1266            .file_stem()
1267            .map(|s| s.to_string_lossy().into_owned())
1268            .unwrap_or_else(|| "unknown-pack".to_string());
1269        Self {
1270            pack_id,
1271            version: "0.0.0".to_string(),
1272            entry_flows: Vec::new(),
1273        }
1274    }
1275
1276    pub fn from_manifest(manifest: &greentic_pack::builder::PackManifest) -> Self {
1277        let entry_flows = if manifest.meta.entry_flows.is_empty() {
1278            manifest
1279                .flows
1280                .iter()
1281                .map(|flow| flow.id.clone())
1282                .collect::<Vec<_>>()
1283        } else {
1284            manifest.meta.entry_flows.clone()
1285        };
1286        Self {
1287            pack_id: manifest.meta.pack_id.clone(),
1288            version: manifest.meta.version.to_string(),
1289            entry_flows,
1290        }
1291    }
1292}
1293
1294fn build_stub_flow_ir(flow_id: &str, flow_type: &str) -> FlowIR {
1295    let mut nodes = IndexMap::new();
1296    nodes.insert(
1297        "complete".into(),
1298        NodeIR {
1299            component: "component.exec".into(),
1300            payload_expr: serde_json::json!({
1301                "component": "qa.process",
1302                "operation": "process",
1303                "input": {
1304                    "status": "done",
1305                    "flow_id": flow_id,
1306                }
1307            }),
1308            routes: vec![RouteIR {
1309                to: None,
1310                out: true,
1311            }],
1312        },
1313    );
1314    FlowIR {
1315        id: flow_id.to_string(),
1316        flow_type: flow_type.to_string(),
1317        start: Some("complete".into()),
1318        parameters: Value::Object(Default::default()),
1319        nodes,
1320    }
1321}
1322
1323impl From<&HttpMockResponse> for LegacyHttpResponse {
1324    fn from(value: &HttpMockResponse) -> Self {
1325        let headers_json = serde_json::to_string(&value.headers).ok();
1326        Self {
1327            status: value.status,
1328            headers_json,
1329            body: value.body.clone(),
1330        }
1331    }
1332}