greentic_runner_host/
pack.rs

1use std::collections::{BTreeMap, HashMap};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7
8use crate::runtime_wasmtime::{Component, Engine, Linker, ResourceTable, Store, WasmResult};
9use anyhow::{Context, Result, anyhow, bail};
10use greentic_flow::ir::{FlowIR, NodeIR, RouteIR};
11use greentic_interfaces::host_import_v0_2;
12use greentic_interfaces::host_import_v0_2::greentic::host_import::imports::{
13    HttpRequest as LegacyHttpRequest, HttpResponse as LegacyHttpResponse,
14    IfaceError as LegacyIfaceError, TenantCtx as LegacyTenantCtx,
15};
16use greentic_interfaces::host_import_v0_6::{self, iface_types, state, types};
17use greentic_interfaces::pack_export_v0_2;
18use greentic_interfaces::pack_export_v0_2::exports::greentic::pack_export::exports::FlowInfo;
19#[cfg(feature = "mcp")]
20use greentic_mcp::{ExecConfig, ExecError, ExecRequest};
21use greentic_pack::reader::{SigningPolicy, open_pack};
22use greentic_session::SessionKey as StoreSessionKey;
23use greentic_types::{
24    EnvId, FlowId, SessionCursor as StoreSessionCursor, SessionData, StateKey as StoreStateKey,
25    TeamId, TenantCtx as TypesTenantCtx, TenantId, UserId,
26};
27use indexmap::IndexMap;
28use reqwest::blocking::Client as BlockingClient;
29use serde::{Deserialize, Serialize};
30use serde_cbor;
31use serde_json::{self, Value};
32use serde_yaml_bw as serde_yaml;
33use tokio::fs;
34use wasmparser::{Parser, Payload};
35use zip::ZipArchive;
36
37use crate::imports;
38use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
39
40use crate::config::HostConfig;
41use crate::secrets::{DynSecretsManager, read_secret_blocking};
42use crate::storage::state::STATE_PREFIX;
43use crate::storage::{DynSessionStore, DynStateStore};
44use crate::verify;
45use crate::wasi::RunnerWasiPolicy;
46use tracing::warn;
47use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
48
49pub struct PackRuntime {
50    path: PathBuf,
51    config: Arc<HostConfig>,
52    engine: Engine,
53    component: Option<Component>,
54    metadata: PackMetadata,
55    mocks: Option<Arc<MockLayer>>,
56    flows: Option<PackFlows>,
57    session_store: Option<DynSessionStore>,
58    state_store: Option<DynStateStore>,
59    wasi_policy: Arc<RunnerWasiPolicy>,
60    secrets: DynSecretsManager,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct FlowDescriptor {
65    pub id: String,
66    #[serde(rename = "type")]
67    pub flow_type: String,
68    pub profile: String,
69    pub version: String,
70    #[serde(default)]
71    pub description: Option<String>,
72}
73
74pub struct HostState {
75    config: Arc<HostConfig>,
76    http_client: BlockingClient,
77    #[cfg(feature = "mcp")]
78    exec_config: Option<ExecConfig>,
79    default_env: String,
80    session_store: Option<DynSessionStore>,
81    state_store: Option<DynStateStore>,
82    mocks: Option<Arc<MockLayer>>,
83    secrets: DynSecretsManager,
84}
85
86impl HostState {
87    pub fn new(
88        config: Arc<HostConfig>,
89        mocks: Option<Arc<MockLayer>>,
90        session_store: Option<DynSessionStore>,
91        state_store: Option<DynStateStore>,
92        secrets: DynSecretsManager,
93    ) -> Result<Self> {
94        let http_client = BlockingClient::builder().build()?;
95        #[cfg(feature = "mcp")]
96        let exec_config = config.mcp_exec_config().ok();
97        let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
98        Ok(Self {
99            config,
100            http_client,
101            #[cfg(feature = "mcp")]
102            exec_config,
103            default_env,
104            session_store,
105            state_store,
106            mocks,
107            secrets,
108        })
109    }
110
111    pub fn get_secret(&self, key: &str) -> Result<String> {
112        if !self.config.secrets_policy.is_allowed(key) {
113            bail!("secret {key} is not permitted by bindings policy");
114        }
115        if let Some(mock) = &self.mocks
116            && let Some(value) = mock.secrets_lookup(key)
117        {
118            return Ok(value);
119        }
120        let bytes = read_secret_blocking(&self.secrets, key)
121            .context("failed to read secret from manager")?;
122        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
123        Ok(value)
124    }
125
126    fn tenant_ctx_from_v6(&self, ctx: Option<types::TenantCtx>) -> Result<TypesTenantCtx> {
127        let tenant_raw = ctx
128            .as_ref()
129            .map(|ctx| ctx.tenant.clone())
130            .unwrap_or_else(|| self.config.tenant.clone());
131        let tenant_id = TenantId::from_str(&tenant_raw)
132            .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
133        let env_id = EnvId::from_str(&self.default_env)
134            .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
135        let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
136        if let Some(ctx) = ctx {
137            if let Some(team) = ctx.team {
138                let team_id =
139                    TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
140                tenant_ctx = tenant_ctx.with_team(Some(team_id));
141            }
142            if let Some(user) = ctx.user {
143                let user_id =
144                    UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
145                tenant_ctx = tenant_ctx.with_user(Some(user_id));
146            }
147            if let Some(flow) = ctx.flow_id {
148                tenant_ctx = tenant_ctx.with_flow(flow);
149            }
150            if let Some(node) = ctx.node_id {
151                tenant_ctx = tenant_ctx.with_node(node);
152            }
153            if let Some(provider) = ctx.provider_id {
154                tenant_ctx = tenant_ctx.with_provider(provider);
155            }
156            if let Some(session) = ctx.session_id {
157                tenant_ctx = tenant_ctx.with_session(session);
158            }
159            tenant_ctx.trace_id = ctx.trace_id;
160        }
161        Ok(tenant_ctx)
162    }
163
164    fn session_store_handle(&self) -> Result<DynSessionStore, types::IfaceError> {
165        self.session_store
166            .as_ref()
167            .cloned()
168            .ok_or(types::IfaceError::Unavailable)
169    }
170
171    fn state_store_handle(&self) -> Result<DynStateStore, types::IfaceError> {
172        self.state_store
173            .as_ref()
174            .cloned()
175            .ok_or(types::IfaceError::Unavailable)
176    }
177
178    fn ensure_user(ctx: &TypesTenantCtx) -> Result<UserId, types::IfaceError> {
179        ctx.user_id
180            .clone()
181            .or_else(|| ctx.user.clone())
182            .ok_or(types::IfaceError::InvalidArg)
183    }
184
185    fn ensure_flow(ctx: &TypesTenantCtx) -> Result<FlowId, types::IfaceError> {
186        let flow = ctx.flow_id().ok_or(types::IfaceError::InvalidArg)?;
187        FlowId::from_str(flow).map_err(|_| types::IfaceError::InvalidArg)
188    }
189
190    fn cursor_from_iface(cursor: iface_types::SessionCursor) -> StoreSessionCursor {
191        let mut store_cursor = StoreSessionCursor::new(cursor.node_pointer);
192        if let Some(reason) = cursor.wait_reason {
193            store_cursor = store_cursor.with_wait_reason(reason);
194        }
195        if let Some(marker) = cursor.outbox_marker {
196            store_cursor = store_cursor.with_outbox_marker(marker);
197        }
198        store_cursor
199    }
200}
201
202pub struct ComponentState {
203    host: HostState,
204    wasi_ctx: WasiCtx,
205    resource_table: ResourceTable,
206}
207
208impl ComponentState {
209    pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
210        let wasi_ctx = policy
211            .instantiate()
212            .context("failed to build WASI context")?;
213        Ok(Self {
214            host,
215            wasi_ctx,
216            resource_table: ResourceTable::new(),
217        })
218    }
219
220    fn host_mut(&mut self) -> &mut HostState {
221        &mut self.host
222    }
223}
224
225impl WasiView for ComponentState {
226    fn ctx(&mut self) -> WasiCtxView<'_> {
227        WasiCtxView {
228            ctx: &mut self.wasi_ctx,
229            table: &mut self.resource_table,
230        }
231    }
232}
233
234#[allow(unsafe_code)]
235unsafe impl Send for ComponentState {}
236#[allow(unsafe_code)]
237unsafe impl Sync for ComponentState {}
238
239impl host_import_v0_6::HostImports for ComponentState {
240    fn secrets_get(
241        &mut self,
242        key: String,
243        ctx: Option<types::TenantCtx>,
244    ) -> WasmResult<Result<String, types::IfaceError>> {
245        host_import_v0_6::HostImports::secrets_get(self.host_mut(), key, ctx)
246    }
247
248    fn telemetry_emit(
249        &mut self,
250        span_json: String,
251        ctx: Option<types::TenantCtx>,
252    ) -> WasmResult<()> {
253        host_import_v0_6::HostImports::telemetry_emit(self.host_mut(), span_json, ctx)
254    }
255
256    fn http_fetch(
257        &mut self,
258        req: host_import_v0_6::http::HttpRequest,
259        ctx: Option<types::TenantCtx>,
260    ) -> WasmResult<Result<host_import_v0_6::http::HttpResponse, types::IfaceError>> {
261        host_import_v0_6::HostImports::http_fetch(self.host_mut(), req, ctx)
262    }
263
264    fn mcp_exec(
265        &mut self,
266        component: String,
267        action: String,
268        args_json: String,
269        ctx: Option<types::TenantCtx>,
270    ) -> WasmResult<Result<String, types::IfaceError>> {
271        host_import_v0_6::HostImports::mcp_exec(self.host_mut(), component, action, args_json, ctx)
272    }
273
274    fn state_get(
275        &mut self,
276        key: iface_types::StateKey,
277        ctx: Option<types::TenantCtx>,
278    ) -> WasmResult<Result<String, types::IfaceError>> {
279        host_import_v0_6::HostImports::state_get(self.host_mut(), key, ctx)
280    }
281
282    fn state_set(
283        &mut self,
284        key: iface_types::StateKey,
285        value_json: String,
286        ctx: Option<types::TenantCtx>,
287    ) -> WasmResult<Result<state::OpAck, types::IfaceError>> {
288        host_import_v0_6::HostImports::state_set(self.host_mut(), key, value_json, ctx)
289    }
290
291    fn session_update(
292        &mut self,
293        cursor: iface_types::SessionCursor,
294        ctx: Option<types::TenantCtx>,
295    ) -> WasmResult<Result<String, types::IfaceError>> {
296        host_import_v0_6::HostImports::session_update(self.host_mut(), cursor, ctx)
297    }
298}
299
300impl host_import_v0_2::HostImports for ComponentState {
301    fn secrets_get(
302        &mut self,
303        key: String,
304        ctx: Option<LegacyTenantCtx>,
305    ) -> WasmResult<Result<String, LegacyIfaceError>> {
306        host_import_v0_2::HostImports::secrets_get(self.host_mut(), key, ctx)
307    }
308
309    fn telemetry_emit(
310        &mut self,
311        span_json: String,
312        ctx: Option<LegacyTenantCtx>,
313    ) -> WasmResult<()> {
314        host_import_v0_2::HostImports::telemetry_emit(self.host_mut(), span_json, ctx)
315    }
316
317    fn tool_invoke(
318        &mut self,
319        tool: String,
320        action: String,
321        args_json: String,
322        ctx: Option<LegacyTenantCtx>,
323    ) -> WasmResult<Result<String, LegacyIfaceError>> {
324        host_import_v0_2::HostImports::tool_invoke(self.host_mut(), tool, action, args_json, ctx)
325    }
326
327    fn http_fetch(
328        &mut self,
329        req: host_import_v0_2::greentic::host_import::imports::HttpRequest,
330        ctx: Option<host_import_v0_2::greentic::host_import::imports::TenantCtx>,
331    ) -> WasmResult<
332        Result<
333            host_import_v0_2::greentic::host_import::imports::HttpResponse,
334            host_import_v0_2::greentic::host_import::imports::IfaceError,
335        >,
336    > {
337        host_import_v0_2::HostImports::http_fetch(self.host_mut(), req, ctx)
338    }
339}
340
341impl host_import_v0_6::HostImports for HostState {
342    fn secrets_get(
343        &mut self,
344        key: String,
345        _ctx: Option<types::TenantCtx>,
346    ) -> WasmResult<Result<String, types::IfaceError>> {
347        Ok(self.get_secret(&key).map_err(|err| {
348            tracing::warn!(secret = %key, error = %err, "secret lookup denied");
349            types::IfaceError::Denied
350        }))
351    }
352
353    fn telemetry_emit(
354        &mut self,
355        span_json: String,
356        _ctx: Option<types::TenantCtx>,
357    ) -> WasmResult<()> {
358        if let Some(mock) = &self.mocks
359            && mock.telemetry_drain(&[("span_json", span_json.as_str())])
360        {
361            return Ok(());
362        }
363        tracing::info!(span = %span_json, "telemetry emit from pack");
364        Ok(())
365    }
366
367    fn http_fetch(
368        &mut self,
369        req: host_import_v0_6::http::HttpRequest,
370        _ctx: Option<types::TenantCtx>,
371    ) -> WasmResult<Result<host_import_v0_6::http::HttpResponse, types::IfaceError>> {
372        let legacy_req = LegacyHttpRequest {
373            method: req.method,
374            url: req.url,
375            headers_json: req.headers_json,
376            body: req.body,
377        };
378        match greentic_interfaces::host_import_v0_2::HostImports::http_fetch(
379            self, legacy_req, None,
380        )? {
381            Ok(resp) => Ok(Ok(host_import_v0_6::http::HttpResponse {
382                status: resp.status,
383                headers_json: resp.headers_json,
384                body: resp.body,
385            })),
386            Err(err) => Ok(Err(map_legacy_error(err))),
387        }
388    }
389
390    fn mcp_exec(
391        &mut self,
392        component: String,
393        action: String,
394        args_json: String,
395        ctx: Option<types::TenantCtx>,
396    ) -> WasmResult<Result<String, types::IfaceError>> {
397        #[cfg(not(feature = "mcp"))]
398        {
399            let _ = (component, action, args_json, ctx);
400            tracing::warn!("mcp.exec requested but crate built without `mcp` feature");
401            return Ok(Err(types::IfaceError::Unavailable));
402        }
403        #[cfg(feature = "mcp")]
404        {
405            let exec_config = match &self.exec_config {
406                Some(cfg) => cfg.clone(),
407                None => {
408                    tracing::warn!(component = %component, action = %action, "exec config unavailable for tool invoke");
409                    return Ok(Err(types::IfaceError::Unavailable));
410                }
411            };
412            let args: Value = match serde_json::from_str(&args_json) {
413                Ok(value) => value,
414                Err(err) => {
415                    tracing::warn!(error = %err, "invalid args for mcp.exec node");
416                    return Ok(Err(types::IfaceError::InvalidArg));
417                }
418            };
419            let tenant = match self.tenant_ctx_from_v6(ctx) {
420                Ok(ctx) => Some(ctx),
421                Err(err) => {
422                    tracing::warn!(error = %err, "failed to parse tenant context for mcp.exec");
423                    None
424                }
425            };
426            let request = ExecRequest {
427                component: component.clone(),
428                action: action.clone(),
429                args,
430                tenant,
431            };
432            match greentic_mcp::exec(request, &exec_config) {
433                Ok(value) => match serde_json::to_string(&value) {
434                    Ok(body) => Ok(Ok(body)),
435                    Err(err) => {
436                        tracing::error!(error = %err, "failed to serialise tool result");
437                        Ok(Err(types::IfaceError::Internal))
438                    }
439                },
440                Err(err) => {
441                    tracing::warn!(component = %component, action = %action, error = %err, "mcp exec failed");
442                    let iface_err = match err {
443                        ExecError::NotFound { .. } => types::IfaceError::NotFound,
444                        ExecError::Tool { .. } => types::IfaceError::Denied,
445                        _ => types::IfaceError::Unavailable,
446                    };
447                    Ok(Err(iface_err))
448                }
449            }
450        }
451    }
452
453    fn state_get(
454        &mut self,
455        key: iface_types::StateKey,
456        ctx: Option<types::TenantCtx>,
457    ) -> WasmResult<Result<String, types::IfaceError>> {
458        let store = match self.state_store_handle() {
459            Ok(store) => store,
460            Err(err) => return Ok(Err(err)),
461        };
462        let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
463            Ok(ctx) => ctx,
464            Err(err) => {
465                tracing::warn!(error = %err, "invalid tenant context for state.get");
466                return Ok(Err(types::IfaceError::InvalidArg));
467            }
468        };
469        let key = StoreStateKey::from(key);
470        match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
471            Ok(Some(value)) => {
472                let result = if let Some(text) = value.as_str() {
473                    text.to_string()
474                } else {
475                    serde_json::to_string(&value).unwrap_or_else(|_| value.to_string())
476                };
477                Ok(Ok(result))
478            }
479            Ok(None) => Ok(Err(types::IfaceError::NotFound)),
480            Err(err) => {
481                tracing::warn!(error = %err, "state.get failed");
482                Ok(Err(types::IfaceError::Internal))
483            }
484        }
485    }
486
487    fn state_set(
488        &mut self,
489        key: iface_types::StateKey,
490        value_json: String,
491        ctx: Option<types::TenantCtx>,
492    ) -> WasmResult<Result<state::OpAck, types::IfaceError>> {
493        let store = match self.state_store_handle() {
494            Ok(store) => store,
495            Err(err) => return Ok(Err(err)),
496        };
497        let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
498            Ok(ctx) => ctx,
499            Err(err) => {
500                tracing::warn!(error = %err, "invalid tenant context for state.set");
501                return Ok(Err(types::IfaceError::InvalidArg));
502            }
503        };
504        let key = StoreStateKey::from(key);
505        let value = serde_json::from_str(&value_json).unwrap_or(Value::String(value_json));
506        match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
507            Ok(()) => Ok(Ok(state::OpAck::Ok)),
508            Err(err) => {
509                tracing::warn!(error = %err, "state.set failed");
510                Ok(Err(types::IfaceError::Internal))
511            }
512        }
513    }
514
515    fn session_update(
516        &mut self,
517        cursor: iface_types::SessionCursor,
518        ctx: Option<types::TenantCtx>,
519    ) -> WasmResult<Result<String, types::IfaceError>> {
520        let store = match self.session_store_handle() {
521            Ok(store) => store,
522            Err(err) => return Ok(Err(err)),
523        };
524        let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
525            Ok(ctx) => ctx,
526            Err(err) => {
527                tracing::warn!(error = %err, "invalid tenant context for session.update");
528                return Ok(Err(types::IfaceError::InvalidArg));
529            }
530        };
531        let user = match Self::ensure_user(&tenant_ctx) {
532            Ok(user) => user,
533            Err(err) => return Ok(Err(err)),
534        };
535        let flow_id = match Self::ensure_flow(&tenant_ctx) {
536            Ok(flow) => flow,
537            Err(err) => return Ok(Err(err)),
538        };
539        let cursor = Self::cursor_from_iface(cursor);
540        let payload = SessionData {
541            tenant_ctx: tenant_ctx.clone(),
542            flow_id,
543            cursor: cursor.clone(),
544            context_json: serde_json::json!({
545                "node_pointer": cursor.node_pointer,
546                "wait_reason": cursor.wait_reason,
547                "outbox_marker": cursor.outbox_marker,
548            })
549            .to_string(),
550        };
551        if let Some(existing) = tenant_ctx.session_id() {
552            let key = StoreSessionKey::from(existing.to_string());
553            if let Err(err) = store.update_session(&key, payload) {
554                tracing::error!(error = %err, "failed to update session snapshot");
555                return Ok(Err(types::IfaceError::Internal));
556            }
557            return Ok(Ok(existing.to_string()));
558        }
559        match store.find_by_user(&tenant_ctx, &user) {
560            Ok(Some((key, _))) => {
561                if let Err(err) = store.update_session(&key, payload) {
562                    tracing::error!(error = %err, "failed to update existing user session");
563                    return Ok(Err(types::IfaceError::Internal));
564                }
565                return Ok(Ok(key.to_string()));
566            }
567            Ok(None) => {}
568            Err(err) => {
569                tracing::error!(error = %err, "session lookup failed");
570                return Ok(Err(types::IfaceError::Internal));
571            }
572        }
573        let key = match store.create_session(&tenant_ctx, payload.clone()) {
574            Ok(key) => key,
575            Err(err) => {
576                tracing::error!(error = %err, "failed to create session");
577                return Ok(Err(types::IfaceError::Internal));
578            }
579        };
580        let ctx_with_session = tenant_ctx.with_session(key.to_string());
581        let updated_payload = SessionData {
582            tenant_ctx: ctx_with_session.clone(),
583            ..payload
584        };
585        if let Err(err) = store.update_session(&key, updated_payload) {
586            tracing::warn!(error = %err, "failed to stamp session id after create");
587        }
588        Ok(Ok(key.to_string()))
589    }
590}
591
592impl greentic_interfaces::host_import_v0_2::HostImports for HostState {
593    fn secrets_get(
594        &mut self,
595        key: String,
596        _ctx: Option<LegacyTenantCtx>,
597    ) -> WasmResult<Result<String, LegacyIfaceError>> {
598        Ok(self.get_secret(&key).map_err(|err| {
599            tracing::warn!(secret = %key, error = %err, "secret lookup denied");
600            LegacyIfaceError::Denied
601        }))
602    }
603
604    fn telemetry_emit(
605        &mut self,
606        span_json: String,
607        _ctx: Option<LegacyTenantCtx>,
608    ) -> WasmResult<()> {
609        if let Some(mock) = &self.mocks
610            && mock.telemetry_drain(&[("span_json", span_json.as_str())])
611        {
612            return Ok(());
613        }
614        tracing::info!(span = %span_json, "telemetry emit from pack");
615        Ok(())
616    }
617
618    fn tool_invoke(
619        &mut self,
620        tool: String,
621        action: String,
622        args_json: String,
623        ctx: Option<LegacyTenantCtx>,
624    ) -> WasmResult<Result<String, LegacyIfaceError>> {
625        #[cfg(not(feature = "mcp"))]
626        {
627            let _ = (tool, action, args_json, ctx);
628            tracing::warn!("tool invoke requested but crate built without `mcp` feature");
629            return Ok(Err(LegacyIfaceError::Unavailable));
630        }
631
632        #[cfg(feature = "mcp")]
633        {
634            let exec_config = match &self.exec_config {
635                Some(cfg) => cfg.clone(),
636                None => {
637                    tracing::warn!(%tool, %action, "exec config unavailable for tool invoke");
638                    return Ok(Err(LegacyIfaceError::Unavailable));
639                }
640            };
641
642            let args: Value = match serde_json::from_str(&args_json) {
643                Ok(value) => value,
644                Err(err) => {
645                    tracing::warn!(error = %err, "invalid args for tool invoke");
646                    return Ok(Err(LegacyIfaceError::InvalidArg));
647                }
648            };
649
650            let tenant = ctx.map(|ctx| map_legacy_tenant_ctx(ctx, &self.default_env));
651
652            let request = ExecRequest {
653                component: tool.clone(),
654                action: action.clone(),
655                args,
656                tenant,
657            };
658
659            if let Some(mock) = &self.mocks
660                && let Some(result) = mock.tool_short_circuit(&tool, &action)
661            {
662                return match result.and_then(|value| {
663                    serde_json::to_string(&value)
664                        .map_err(|err| anyhow!("failed to serialise mock tool output: {err}"))
665                }) {
666                    Ok(body) => Ok(Ok(body)),
667                    Err(err) => {
668                        tracing::error!(error = %err, "mock tool execution failed");
669                        Ok(Err(LegacyIfaceError::Internal))
670                    }
671                };
672            }
673
674            match greentic_mcp::exec(request, &exec_config) {
675                Ok(value) => match serde_json::to_string(&value) {
676                    Ok(body) => Ok(Ok(body)),
677                    Err(err) => {
678                        tracing::error!(error = %err, "failed to serialise tool result");
679                        Ok(Err(LegacyIfaceError::Internal))
680                    }
681                },
682                Err(err) => {
683                    tracing::warn!(%tool, %action, error = %err, "tool invoke failed");
684                    let iface_err = match err {
685                        ExecError::NotFound { .. } => LegacyIfaceError::NotFound,
686                        ExecError::Tool { .. } => LegacyIfaceError::Denied,
687                        _ => LegacyIfaceError::Unavailable,
688                    };
689                    Ok(Err(iface_err))
690                }
691            }
692        }
693    }
694
695    fn http_fetch(
696        &mut self,
697        req: LegacyHttpRequest,
698        _ctx: Option<LegacyTenantCtx>,
699    ) -> WasmResult<Result<LegacyHttpResponse, LegacyIfaceError>> {
700        if !self.config.http_enabled {
701            tracing::warn!(url = %req.url, "http fetch denied by policy");
702            return Ok(Err(LegacyIfaceError::Denied));
703        }
704
705        let mut mock_state = None;
706        let raw_body = req.body.clone();
707        if let Some(mock) = &self.mocks
708            && let Ok(meta) = HttpMockRequest::new(
709                &req.method,
710                &req.url,
711                raw_body.as_deref().map(|body| body.as_bytes()),
712            )
713        {
714            match mock.http_begin(&meta) {
715                HttpDecision::Mock(response) => {
716                    let http = LegacyHttpResponse::from(&response);
717                    return Ok(Ok(http));
718                }
719                HttpDecision::Deny(reason) => {
720                    tracing::warn!(url = %req.url, reason = %reason, "http fetch blocked by mocks");
721                    return Ok(Err(LegacyIfaceError::Denied));
722                }
723                HttpDecision::Passthrough { record } => {
724                    mock_state = Some((meta, record));
725                }
726            }
727        }
728
729        let method = req.method.parse().unwrap_or(reqwest::Method::GET);
730        let mut builder = self.http_client.request(method, &req.url);
731
732        if let Some(headers_json) = req.headers_json.as_ref() {
733            match serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(headers_json) {
734                Ok(map) => {
735                    for (key, value) in map {
736                        if let Some(val) = value.as_str()
737                            && let Ok(header) =
738                                reqwest::header::HeaderName::from_bytes(key.as_bytes())
739                            && let Ok(header_value) = reqwest::header::HeaderValue::from_str(val)
740                        {
741                            builder = builder.header(header, header_value);
742                        }
743                    }
744                }
745                Err(err) => {
746                    tracing::warn!(error = %err, "failed to parse headers for http.fetch");
747                }
748            }
749        }
750
751        if let Some(body) = raw_body.clone() {
752            builder = builder.body(body);
753        }
754
755        let response = match builder.send() {
756            Ok(resp) => resp,
757            Err(err) => {
758                tracing::error!(url = %req.url, error = %err, "http fetch failed");
759                return Ok(Err(LegacyIfaceError::Unavailable));
760            }
761        };
762
763        let status = response.status().as_u16();
764        let headers_map = response
765            .headers()
766            .iter()
767            .map(|(k, v)| {
768                (
769                    k.as_str().to_string(),
770                    v.to_str().unwrap_or_default().to_string(),
771                )
772            })
773            .collect::<BTreeMap<_, _>>();
774        let headers_json = serde_json::to_string(&headers_map).ok();
775        let body = response.text().ok();
776
777        if let Some((meta, true)) = mock_state.take()
778            && let Some(mock) = &self.mocks
779        {
780            let recorded = HttpMockResponse::new(status, headers_map.clone(), body.clone());
781            mock.http_record(&meta, &recorded);
782        }
783
784        Ok(Ok(LegacyHttpResponse {
785            status,
786            headers_json,
787            body,
788        }))
789    }
790}
791
792impl PackRuntime {
793    #[allow(clippy::too_many_arguments)]
794    pub async fn load(
795        path: impl AsRef<Path>,
796        config: Arc<HostConfig>,
797        mocks: Option<Arc<MockLayer>>,
798        archive_source: Option<&Path>,
799        session_store: Option<DynSessionStore>,
800        state_store: Option<DynStateStore>,
801        wasi_policy: Arc<RunnerWasiPolicy>,
802        secrets: DynSecretsManager,
803        verify_archive: bool,
804    ) -> Result<Self> {
805        let path = path.as_ref();
806        let is_component = path
807            .extension()
808            .and_then(|ext| ext.to_str())
809            .map(|ext| ext.eq_ignore_ascii_case("wasm"))
810            .unwrap_or(false);
811        let archive_hint = archive_source.or(if is_component { None } else { Some(path) });
812        if verify_archive {
813            let verify_target = archive_hint.unwrap_or(path);
814            verify::verify_pack(verify_target).await?;
815            tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
816        }
817        let engine = Engine::default();
818        let wasm_bytes = fs::read(path).await?;
819        let mut metadata =
820            PackMetadata::from_wasm(&wasm_bytes).unwrap_or_else(|| PackMetadata::fallback(path));
821        let mut flows = if let Some(archive_path) = archive_hint {
822            match PackFlows::from_archive(archive_path) {
823                Ok(cache) => {
824                    metadata = cache.metadata.clone();
825                    Some(cache)
826                }
827                Err(err) => {
828                    warn!(
829                        error = %err,
830                        pack = %archive_path.display(),
831                        "failed to parse pack flows via greentic-pack; falling back to component exports"
832                    );
833                    None
834                }
835            }
836        } else {
837            None
838        };
839        let component = match Component::from_file(&engine, path) {
840            Ok(component) => Some(component),
841            Err(err) => {
842                if let Some(archive_path) = archive_hint {
843                    if flows.is_none() {
844                        let archive_data =
845                            PackFlows::from_archive(archive_path).with_context(|| {
846                                format!(
847                                    "failed to build flow cache from {}",
848                                    archive_path.display()
849                                )
850                            })?;
851                        metadata = archive_data.metadata.clone();
852                        flows = Some(archive_data);
853                    }
854                    warn!(
855                        error = %err,
856                        pack = %archive_path.display(),
857                        "component load failed; continuing with cached flow metadata"
858                    );
859                    None
860                } else {
861                    return Err(err);
862                }
863            }
864        };
865        Ok(Self {
866            path: path.to_path_buf(),
867            config,
868            engine,
869            component,
870            metadata,
871            mocks,
872            flows,
873            session_store,
874            state_store,
875            wasi_policy,
876            secrets,
877        })
878    }
879
880    pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
881        if let Some(cache) = &self.flows {
882            return Ok(cache.descriptors.clone());
883        }
884        tracing::trace!(
885            tenant = %self.config.tenant,
886            pack_path = %self.path.display(),
887            "listing flows from pack"
888        );
889        let component = self
890            .component
891            .as_ref()
892            .ok_or_else(|| anyhow!("pack component unavailable"))?;
893        let host_state = HostState::new(
894            Arc::clone(&self.config),
895            self.mocks.clone(),
896            self.session_store.clone(),
897            self.state_store.clone(),
898            Arc::clone(&self.secrets),
899        )?;
900        let mut store = Store::new(
901            &self.engine,
902            ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?,
903        );
904        let mut linker = Linker::new(&self.engine);
905        imports::register_all(&mut linker)?;
906        let bindings = pack_export_v0_2::PackExports::instantiate(&mut store, component, &linker)?;
907        let exports = bindings.greentic_pack_export_exports();
908        let flows_raw = match exports.call_list_flows(&mut store)? {
909            Ok(flows) => flows,
910            Err(err) => {
911                bail!("pack list_flows failed: {err:?}");
912            }
913        };
914        let flows = flows_raw
915            .into_iter()
916            .map(|flow: FlowInfo| {
917                let profile = flow.profile.clone();
918                let version = flow.version.clone();
919                FlowDescriptor {
920                    id: flow.id,
921                    flow_type: flow.flow_type,
922                    profile,
923                    version,
924                    description: Some(format!("{}@{}", flow.profile, flow.version)),
925                }
926            })
927            .collect();
928        Ok(flows)
929    }
930
931    #[allow(dead_code)]
932    pub async fn run_flow(
933        &self,
934        _flow_id: &str,
935        _input: serde_json::Value,
936    ) -> Result<serde_json::Value> {
937        // TODO: dispatch flow execution via Wasmtime
938        Ok(serde_json::json!({}))
939    }
940
941    pub fn load_flow_ir(&self, flow_id: &str) -> Result<greentic_flow::ir::FlowIR> {
942        if let Some(cache) = &self.flows {
943            return cache
944                .flows
945                .get(flow_id)
946                .cloned()
947                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
948        }
949        let component = self
950            .component
951            .as_ref()
952            .ok_or_else(|| anyhow!("pack component unavailable"))?;
953        let host_state = HostState::new(
954            Arc::clone(&self.config),
955            self.mocks.clone(),
956            self.session_store.clone(),
957            self.state_store.clone(),
958            Arc::clone(&self.secrets),
959        )?;
960        let mut store = Store::new(
961            &self.engine,
962            ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?,
963        );
964        let mut linker = Linker::new(&self.engine);
965        imports::register_all(&mut linker)?;
966        let bindings = pack_export_v0_2::PackExports::instantiate(&mut store, component, &linker)?;
967        let exports = bindings.greentic_pack_export_exports();
968        let flow_name = flow_id.to_string();
969        let metadata = match exports.call_flow_metadata(&mut store, &flow_name)? {
970            Ok(doc) => doc,
971            Err(err) => bail!("pack flow_metadata({flow_id}) failed: {err:?}"),
972        };
973        let flow_doc: greentic_flow::model::FlowDoc = serde_json::from_str(&metadata)
974            .or_else(|_| serde_yaml::from_str(&metadata))
975            .with_context(|| format!("failed to parse flow metadata for {flow_id}"))?;
976        let ir = greentic_flow::to_ir(flow_doc)?;
977        Ok(ir)
978    }
979
980    pub fn metadata(&self) -> &PackMetadata {
981        &self.metadata
982    }
983}
984
985#[cfg(feature = "mcp")]
986fn map_legacy_tenant_ctx(ctx: LegacyTenantCtx, default_env: &str) -> TypesTenantCtx {
987    let env = ctx
988        .deployment
989        .runtime
990        .unwrap_or_else(|| default_env.to_string());
991
992    let env_id = EnvId::from_str(env.as_str()).expect("invalid env id");
993    let tenant_id = TenantId::from_str(ctx.tenant.as_str()).expect("invalid tenant id");
994    let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
995    tenant_ctx = tenant_ctx.with_team(
996        ctx.team
997            .as_ref()
998            .and_then(|team| TeamId::from_str(team.as_str()).ok()),
999    );
1000    tenant_ctx = tenant_ctx.with_user(
1001        ctx.user
1002            .as_ref()
1003            .and_then(|user| UserId::from_str(user.as_str()).ok()),
1004    );
1005    tenant_ctx.trace_id = ctx.trace_id;
1006    tenant_ctx
1007}
1008
1009fn map_legacy_error(err: LegacyIfaceError) -> types::IfaceError {
1010    match err {
1011        LegacyIfaceError::InvalidArg => types::IfaceError::InvalidArg,
1012        LegacyIfaceError::NotFound => types::IfaceError::NotFound,
1013        LegacyIfaceError::Denied => types::IfaceError::Denied,
1014        LegacyIfaceError::Unavailable => types::IfaceError::Unavailable,
1015        LegacyIfaceError::Internal => types::IfaceError::Internal,
1016    }
1017}
1018
1019struct PackFlows {
1020    descriptors: Vec<FlowDescriptor>,
1021    flows: HashMap<String, FlowIR>,
1022    metadata: PackMetadata,
1023}
1024
1025impl PackFlows {
1026    fn from_archive(path: &Path) -> Result<Self> {
1027        let pack = open_pack(path, SigningPolicy::DevOk)
1028            .map_err(|err| anyhow!("failed to open pack {}: {}", path.display(), err.message))?;
1029        let file = File::open(path)
1030            .with_context(|| format!("failed to open archive {}", path.display()))?;
1031        let mut archive = ZipArchive::new(file)
1032            .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1033        let mut flows = HashMap::new();
1034        let mut descriptors = Vec::new();
1035        for flow in &pack.manifest.flows {
1036            match load_flow_entry(&mut archive, flow, &pack.manifest.meta) {
1037                Ok((descriptor, ir)) => {
1038                    flows.insert(descriptor.id.clone(), ir);
1039                    descriptors.push(descriptor);
1040                }
1041                Err(err) => {
1042                    warn!(
1043                        error = %err,
1044                        flow_id = flow.id,
1045                        "failed to parse flow metadata from archive; using stub flow"
1046                    );
1047                    let stub = build_stub_flow_ir(&flow.id, &flow.kind);
1048                    flows.insert(flow.id.clone(), stub.clone());
1049                    descriptors.push(FlowDescriptor {
1050                        id: flow.id.clone(),
1051                        flow_type: flow.kind.clone(),
1052                        profile: pack.manifest.meta.name.clone(),
1053                        version: pack.manifest.meta.version.to_string(),
1054                        description: Some(flow.kind.clone()),
1055                    });
1056                }
1057            }
1058        }
1059
1060        Ok(Self {
1061            metadata: PackMetadata::from_manifest(&pack.manifest),
1062            descriptors,
1063            flows,
1064        })
1065    }
1066}
1067
1068fn load_flow_entry(
1069    archive: &mut ZipArchive<File>,
1070    entry: &greentic_pack::builder::FlowEntry,
1071    meta: &greentic_pack::builder::PackMeta,
1072) -> Result<(FlowDescriptor, FlowIR)> {
1073    let doc = load_flow_doc(archive, entry)?;
1074    let ir = greentic_flow::to_ir(doc.clone())
1075        .with_context(|| format!("failed to build IR for flow {}", doc.id))?;
1076    let descriptor = FlowDescriptor {
1077        id: doc.id.clone(),
1078        flow_type: doc.flow_type.clone(),
1079        profile: meta.name.clone(),
1080        version: meta.version.to_string(),
1081        description: doc
1082            .description
1083            .clone()
1084            .or(doc.title.clone())
1085            .or_else(|| Some(entry.kind.clone())),
1086    };
1087    Ok((descriptor, ir))
1088}
1089
1090fn load_flow_doc(
1091    archive: &mut ZipArchive<File>,
1092    entry: &greentic_pack::builder::FlowEntry,
1093) -> Result<greentic_flow::model::FlowDoc> {
1094    if let Ok(bytes) = read_entry(archive, &entry.file_yaml)
1095        && let Ok(doc) = serde_yaml::from_slice(&bytes)
1096    {
1097        return Ok(doc);
1098    }
1099    let json_bytes = read_entry(archive, &entry.file_json)
1100        .with_context(|| format!("missing flow document {}", entry.file_json))?;
1101    let doc = serde_json::from_slice(&json_bytes)
1102        .with_context(|| format!("failed to parse {}", entry.file_json))?;
1103    Ok(doc)
1104}
1105
1106fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1107    let mut file = archive
1108        .by_name(name)
1109        .with_context(|| format!("entry {name} missing from archive"))?;
1110    let mut buf = Vec::new();
1111    file.read_to_end(&mut buf)?;
1112    Ok(buf)
1113}
1114
1115#[derive(Clone, Debug, Default, Serialize, Deserialize)]
1116pub struct PackMetadata {
1117    pub pack_id: String,
1118    pub version: String,
1119    #[serde(default)]
1120    pub entry_flows: Vec<String>,
1121}
1122
1123impl PackMetadata {
1124    fn from_wasm(bytes: &[u8]) -> Option<Self> {
1125        let parser = Parser::new(0);
1126        for payload in parser.parse_all(bytes) {
1127            let payload = payload.ok()?;
1128            match payload {
1129                Payload::CustomSection(section) => {
1130                    if section.name() == "greentic.manifest"
1131                        && let Ok(meta) = Self::from_bytes(section.data())
1132                    {
1133                        return Some(meta);
1134                    }
1135                }
1136                Payload::DataSection(reader) => {
1137                    for segment in reader.into_iter().flatten() {
1138                        if let Ok(meta) = Self::from_bytes(segment.data) {
1139                            return Some(meta);
1140                        }
1141                    }
1142                }
1143                _ => {}
1144            }
1145        }
1146        None
1147    }
1148
1149    fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
1150        #[derive(Deserialize)]
1151        struct RawManifest {
1152            pack_id: String,
1153            version: String,
1154            #[serde(default)]
1155            entry_flows: Vec<String>,
1156            #[serde(default)]
1157            flows: Vec<RawFlow>,
1158        }
1159
1160        #[derive(Deserialize)]
1161        struct RawFlow {
1162            id: String,
1163        }
1164
1165        let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
1166        let mut entry_flows = if manifest.entry_flows.is_empty() {
1167            manifest.flows.iter().map(|f| f.id.clone()).collect()
1168        } else {
1169            manifest.entry_flows.clone()
1170        };
1171        entry_flows.retain(|id| !id.is_empty());
1172        Ok(Self {
1173            pack_id: manifest.pack_id,
1174            version: manifest.version,
1175            entry_flows,
1176        })
1177    }
1178
1179    pub fn fallback(path: &Path) -> Self {
1180        let pack_id = path
1181            .file_stem()
1182            .map(|s| s.to_string_lossy().into_owned())
1183            .unwrap_or_else(|| "unknown-pack".to_string());
1184        Self {
1185            pack_id,
1186            version: "0.0.0".to_string(),
1187            entry_flows: Vec::new(),
1188        }
1189    }
1190
1191    pub fn from_manifest(manifest: &greentic_pack::builder::PackManifest) -> Self {
1192        let entry_flows = if manifest.meta.entry_flows.is_empty() {
1193            manifest
1194                .flows
1195                .iter()
1196                .map(|flow| flow.id.clone())
1197                .collect::<Vec<_>>()
1198        } else {
1199            manifest.meta.entry_flows.clone()
1200        };
1201        Self {
1202            pack_id: manifest.meta.pack_id.clone(),
1203            version: manifest.meta.version.to_string(),
1204            entry_flows,
1205        }
1206    }
1207}
1208
1209fn build_stub_flow_ir(flow_id: &str, flow_type: &str) -> FlowIR {
1210    let mut nodes = IndexMap::new();
1211    nodes.insert(
1212        "complete".into(),
1213        NodeIR {
1214            component: "qa.process".into(),
1215            payload_expr: serde_json::json!({
1216                "status": "done",
1217                "flow_id": flow_id,
1218            }),
1219            routes: vec![RouteIR {
1220                to: None,
1221                out: true,
1222            }],
1223        },
1224    );
1225    FlowIR {
1226        id: flow_id.to_string(),
1227        flow_type: flow_type.to_string(),
1228        start: Some("complete".into()),
1229        parameters: Value::Object(Default::default()),
1230        nodes,
1231    }
1232}
1233
1234impl From<&HttpMockResponse> for LegacyHttpResponse {
1235    fn from(value: &HttpMockResponse) -> Self {
1236        let headers_json = serde_json::to_string(&value.headers).ok();
1237        Self {
1238            status: value.status,
1239            headers_json,
1240            body: value.body.clone(),
1241        }
1242    }
1243}