greentic_runner_host/
pack.rs

1use std::collections::{BTreeMap, HashMap};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7
8use crate::runtime_wasmtime::{Component, Engine, Linker, ResourceTable, Store, WasmResult};
9use anyhow::{Context, Result, anyhow, bail};
10use greentic_flow::ir::{FlowIR, NodeIR, RouteIR};
11use greentic_interfaces::host_import_v0_2;
12use greentic_interfaces::host_import_v0_2::greentic::host_import::imports::{
13    HttpRequest as LegacyHttpRequest, HttpResponse as LegacyHttpResponse,
14    IfaceError as LegacyIfaceError, TenantCtx as LegacyTenantCtx,
15};
16use greentic_interfaces::host_import_v0_6::{self, iface_types, state, types};
17use greentic_interfaces::pack_export_v0_2;
18use greentic_interfaces::pack_export_v0_2::exports::greentic::pack_export::exports::FlowInfo;
19#[cfg(feature = "mcp")]
20use greentic_mcp::{ExecConfig, ExecError, ExecRequest};
21use greentic_session::SessionKey as StoreSessionKey;
22use greentic_types::{
23    EnvId, FlowId, SessionCursor as StoreSessionCursor, SessionData, StateKey as StoreStateKey,
24    TeamId, TenantCtx as TypesTenantCtx, TenantId, UserId,
25};
26use indexmap::IndexMap;
27use reqwest::blocking::Client as BlockingClient;
28use serde::{Deserialize, Serialize};
29use serde_cbor;
30use serde_json::{self, Value, json};
31use serde_yaml_bw as serde_yaml;
32use tokio::fs;
33use wasmparser::{Parser, Payload};
34use zip::ZipArchive;
35
36use crate::imports;
37use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
38
39use crate::config::HostConfig;
40use crate::storage::state::STATE_PREFIX;
41use crate::storage::{DynSessionStore, DynStateStore};
42use crate::verify;
43use crate::wasi::RunnerWasiPolicy;
44use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
45
46pub struct PackRuntime {
47    path: PathBuf,
48    config: Arc<HostConfig>,
49    engine: Engine,
50    component: Option<Component>,
51    metadata: PackMetadata,
52    mocks: Option<Arc<MockLayer>>,
53    archive: Option<ArchiveFlows>,
54    session_store: Option<DynSessionStore>,
55    state_store: Option<DynStateStore>,
56    wasi_policy: Arc<RunnerWasiPolicy>,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct FlowDescriptor {
61    pub id: String,
62    #[serde(rename = "type")]
63    pub flow_type: String,
64    pub profile: String,
65    pub version: String,
66    #[serde(default)]
67    pub description: Option<String>,
68}
69
70pub struct HostState {
71    config: Arc<HostConfig>,
72    http_client: BlockingClient,
73    #[cfg(feature = "mcp")]
74    exec_config: Option<ExecConfig>,
75    default_env: String,
76    session_store: Option<DynSessionStore>,
77    state_store: Option<DynStateStore>,
78    mocks: Option<Arc<MockLayer>>,
79}
80
81impl HostState {
82    pub fn new(
83        config: Arc<HostConfig>,
84        mocks: Option<Arc<MockLayer>>,
85        session_store: Option<DynSessionStore>,
86        state_store: Option<DynStateStore>,
87    ) -> Result<Self> {
88        let http_client = BlockingClient::builder().build()?;
89        #[cfg(feature = "mcp")]
90        let exec_config = config.mcp_exec_config().ok();
91        let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
92        Ok(Self {
93            config,
94            http_client,
95            #[cfg(feature = "mcp")]
96            exec_config,
97            default_env,
98            session_store,
99            state_store,
100            mocks,
101        })
102    }
103
104    pub fn get_secret(&self, key: &str) -> Result<String> {
105        if !self.config.secrets_policy.is_allowed(key) {
106            bail!("secret {key} is not permitted by bindings policy");
107        }
108        if let Some(mock) = &self.mocks
109            && let Some(value) = mock.secrets_lookup(key)
110        {
111            return Ok(value);
112        }
113        if let Ok(value) = std::env::var(key) {
114            return Ok(value);
115        }
116        bail!("secret {key} not found in environment");
117    }
118
119    fn tenant_ctx_from_v6(&self, ctx: Option<types::TenantCtx>) -> Result<TypesTenantCtx> {
120        let tenant_raw = ctx
121            .as_ref()
122            .map(|ctx| ctx.tenant.clone())
123            .unwrap_or_else(|| self.config.tenant.clone());
124        let tenant_id = TenantId::from_str(&tenant_raw)
125            .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
126        let env_id = EnvId::from_str(&self.default_env)
127            .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
128        let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
129        if let Some(ctx) = ctx {
130            if let Some(team) = ctx.team {
131                let team_id =
132                    TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
133                tenant_ctx = tenant_ctx.with_team(Some(team_id));
134            }
135            if let Some(user) = ctx.user {
136                let user_id =
137                    UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
138                tenant_ctx = tenant_ctx.with_user(Some(user_id));
139            }
140            if let Some(flow) = ctx.flow_id {
141                tenant_ctx = tenant_ctx.with_flow(flow);
142            }
143            if let Some(node) = ctx.node_id {
144                tenant_ctx = tenant_ctx.with_node(node);
145            }
146            if let Some(provider) = ctx.provider_id {
147                tenant_ctx = tenant_ctx.with_provider(provider);
148            }
149            if let Some(session) = ctx.session_id {
150                tenant_ctx = tenant_ctx.with_session(session);
151            }
152            tenant_ctx.trace_id = ctx.trace_id;
153        }
154        Ok(tenant_ctx)
155    }
156
157    fn session_store_handle(&self) -> Result<DynSessionStore, types::IfaceError> {
158        self.session_store
159            .as_ref()
160            .cloned()
161            .ok_or(types::IfaceError::Unavailable)
162    }
163
164    fn state_store_handle(&self) -> Result<DynStateStore, types::IfaceError> {
165        self.state_store
166            .as_ref()
167            .cloned()
168            .ok_or(types::IfaceError::Unavailable)
169    }
170
171    fn ensure_user(ctx: &TypesTenantCtx) -> Result<UserId, types::IfaceError> {
172        ctx.user_id
173            .clone()
174            .or_else(|| ctx.user.clone())
175            .ok_or(types::IfaceError::InvalidArg)
176    }
177
178    fn ensure_flow(ctx: &TypesTenantCtx) -> Result<FlowId, types::IfaceError> {
179        let flow = ctx.flow_id().ok_or(types::IfaceError::InvalidArg)?;
180        FlowId::from_str(flow).map_err(|_| types::IfaceError::InvalidArg)
181    }
182
183    fn cursor_from_iface(cursor: iface_types::SessionCursor) -> StoreSessionCursor {
184        let mut store_cursor = StoreSessionCursor::new(cursor.node_pointer);
185        if let Some(reason) = cursor.wait_reason {
186            store_cursor = store_cursor.with_wait_reason(reason);
187        }
188        if let Some(marker) = cursor.outbox_marker {
189            store_cursor = store_cursor.with_outbox_marker(marker);
190        }
191        store_cursor
192    }
193}
194
195pub struct ComponentState {
196    host: HostState,
197    wasi_ctx: WasiCtx,
198    resource_table: ResourceTable,
199}
200
201impl ComponentState {
202    pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
203        let wasi_ctx = policy
204            .instantiate()
205            .context("failed to build WASI context")?;
206        Ok(Self {
207            host,
208            wasi_ctx,
209            resource_table: ResourceTable::new(),
210        })
211    }
212
213    fn host_mut(&mut self) -> &mut HostState {
214        &mut self.host
215    }
216}
217
218impl WasiView for ComponentState {
219    fn ctx(&mut self) -> WasiCtxView<'_> {
220        WasiCtxView {
221            ctx: &mut self.wasi_ctx,
222            table: &mut self.resource_table,
223        }
224    }
225}
226
227#[allow(unsafe_code)]
228unsafe impl Send for ComponentState {}
229#[allow(unsafe_code)]
230unsafe impl Sync for ComponentState {}
231
232impl host_import_v0_6::HostImports for ComponentState {
233    fn secrets_get(
234        &mut self,
235        key: String,
236        ctx: Option<types::TenantCtx>,
237    ) -> WasmResult<Result<String, types::IfaceError>> {
238        host_import_v0_6::HostImports::secrets_get(self.host_mut(), key, ctx)
239    }
240
241    fn telemetry_emit(
242        &mut self,
243        span_json: String,
244        ctx: Option<types::TenantCtx>,
245    ) -> WasmResult<()> {
246        host_import_v0_6::HostImports::telemetry_emit(self.host_mut(), span_json, ctx)
247    }
248
249    fn http_fetch(
250        &mut self,
251        req: host_import_v0_6::http::HttpRequest,
252        ctx: Option<types::TenantCtx>,
253    ) -> WasmResult<Result<host_import_v0_6::http::HttpResponse, types::IfaceError>> {
254        host_import_v0_6::HostImports::http_fetch(self.host_mut(), req, ctx)
255    }
256
257    fn mcp_exec(
258        &mut self,
259        component: String,
260        action: String,
261        args_json: String,
262        ctx: Option<types::TenantCtx>,
263    ) -> WasmResult<Result<String, types::IfaceError>> {
264        host_import_v0_6::HostImports::mcp_exec(self.host_mut(), component, action, args_json, ctx)
265    }
266
267    fn state_get(
268        &mut self,
269        key: iface_types::StateKey,
270        ctx: Option<types::TenantCtx>,
271    ) -> WasmResult<Result<String, types::IfaceError>> {
272        host_import_v0_6::HostImports::state_get(self.host_mut(), key, ctx)
273    }
274
275    fn state_set(
276        &mut self,
277        key: iface_types::StateKey,
278        value_json: String,
279        ctx: Option<types::TenantCtx>,
280    ) -> WasmResult<Result<state::OpAck, types::IfaceError>> {
281        host_import_v0_6::HostImports::state_set(self.host_mut(), key, value_json, ctx)
282    }
283
284    fn session_update(
285        &mut self,
286        cursor: iface_types::SessionCursor,
287        ctx: Option<types::TenantCtx>,
288    ) -> WasmResult<Result<String, types::IfaceError>> {
289        host_import_v0_6::HostImports::session_update(self.host_mut(), cursor, ctx)
290    }
291}
292
293impl host_import_v0_2::HostImports for ComponentState {
294    fn secrets_get(
295        &mut self,
296        key: String,
297        ctx: Option<LegacyTenantCtx>,
298    ) -> WasmResult<Result<String, LegacyIfaceError>> {
299        host_import_v0_2::HostImports::secrets_get(self.host_mut(), key, ctx)
300    }
301
302    fn telemetry_emit(
303        &mut self,
304        span_json: String,
305        ctx: Option<LegacyTenantCtx>,
306    ) -> WasmResult<()> {
307        host_import_v0_2::HostImports::telemetry_emit(self.host_mut(), span_json, ctx)
308    }
309
310    fn tool_invoke(
311        &mut self,
312        tool: String,
313        action: String,
314        args_json: String,
315        ctx: Option<LegacyTenantCtx>,
316    ) -> WasmResult<Result<String, LegacyIfaceError>> {
317        host_import_v0_2::HostImports::tool_invoke(self.host_mut(), tool, action, args_json, ctx)
318    }
319
320    fn http_fetch(
321        &mut self,
322        req: host_import_v0_2::greentic::host_import::imports::HttpRequest,
323        ctx: Option<host_import_v0_2::greentic::host_import::imports::TenantCtx>,
324    ) -> WasmResult<
325        Result<
326            host_import_v0_2::greentic::host_import::imports::HttpResponse,
327            host_import_v0_2::greentic::host_import::imports::IfaceError,
328        >,
329    > {
330        host_import_v0_2::HostImports::http_fetch(self.host_mut(), req, ctx)
331    }
332}
333
334impl host_import_v0_6::HostImports for HostState {
335    fn secrets_get(
336        &mut self,
337        key: String,
338        _ctx: Option<types::TenantCtx>,
339    ) -> WasmResult<Result<String, types::IfaceError>> {
340        Ok(self.get_secret(&key).map_err(|err| {
341            tracing::warn!(secret = %key, error = %err, "secret lookup denied");
342            types::IfaceError::Denied
343        }))
344    }
345
346    fn telemetry_emit(
347        &mut self,
348        span_json: String,
349        _ctx: Option<types::TenantCtx>,
350    ) -> WasmResult<()> {
351        if let Some(mock) = &self.mocks
352            && mock.telemetry_drain(&[("span_json", span_json.as_str())])
353        {
354            return Ok(());
355        }
356        tracing::info!(span = %span_json, "telemetry emit from pack");
357        Ok(())
358    }
359
360    fn http_fetch(
361        &mut self,
362        req: host_import_v0_6::http::HttpRequest,
363        _ctx: Option<types::TenantCtx>,
364    ) -> WasmResult<Result<host_import_v0_6::http::HttpResponse, types::IfaceError>> {
365        let legacy_req = LegacyHttpRequest {
366            method: req.method,
367            url: req.url,
368            headers_json: req.headers_json,
369            body: req.body,
370        };
371        match greentic_interfaces::host_import_v0_2::HostImports::http_fetch(
372            self, legacy_req, None,
373        )? {
374            Ok(resp) => Ok(Ok(host_import_v0_6::http::HttpResponse {
375                status: resp.status,
376                headers_json: resp.headers_json,
377                body: resp.body,
378            })),
379            Err(err) => Ok(Err(map_legacy_error(err))),
380        }
381    }
382
383    fn mcp_exec(
384        &mut self,
385        component: String,
386        action: String,
387        args_json: String,
388        ctx: Option<types::TenantCtx>,
389    ) -> WasmResult<Result<String, types::IfaceError>> {
390        #[cfg(not(feature = "mcp"))]
391        {
392            let _ = (component, action, args_json, ctx);
393            tracing::warn!("mcp.exec requested but crate built without `mcp` feature");
394            return Ok(Err(types::IfaceError::Unavailable));
395        }
396        #[cfg(feature = "mcp")]
397        {
398            let exec_config = match &self.exec_config {
399                Some(cfg) => cfg.clone(),
400                None => {
401                    tracing::warn!(component = %component, action = %action, "exec config unavailable for tool invoke");
402                    return Ok(Err(types::IfaceError::Unavailable));
403                }
404            };
405            let args: Value = match serde_json::from_str(&args_json) {
406                Ok(value) => value,
407                Err(err) => {
408                    tracing::warn!(error = %err, "invalid args for mcp.exec node");
409                    return Ok(Err(types::IfaceError::InvalidArg));
410                }
411            };
412            let tenant = match self.tenant_ctx_from_v6(ctx) {
413                Ok(ctx) => Some(ctx),
414                Err(err) => {
415                    tracing::warn!(error = %err, "failed to parse tenant context for mcp.exec");
416                    None
417                }
418            };
419            let request = ExecRequest {
420                component: component.clone(),
421                action: action.clone(),
422                args,
423                tenant,
424            };
425            match greentic_mcp::exec(request, &exec_config) {
426                Ok(value) => match serde_json::to_string(&value) {
427                    Ok(body) => Ok(Ok(body)),
428                    Err(err) => {
429                        tracing::error!(error = %err, "failed to serialise tool result");
430                        Ok(Err(types::IfaceError::Internal))
431                    }
432                },
433                Err(err) => {
434                    tracing::warn!(component = %component, action = %action, error = %err, "mcp exec failed");
435                    let iface_err = match err {
436                        ExecError::NotFound { .. } => types::IfaceError::NotFound,
437                        ExecError::Tool { .. } => types::IfaceError::Denied,
438                        _ => types::IfaceError::Unavailable,
439                    };
440                    Ok(Err(iface_err))
441                }
442            }
443        }
444    }
445
446    fn state_get(
447        &mut self,
448        key: iface_types::StateKey,
449        ctx: Option<types::TenantCtx>,
450    ) -> WasmResult<Result<String, types::IfaceError>> {
451        let store = match self.state_store_handle() {
452            Ok(store) => store,
453            Err(err) => return Ok(Err(err)),
454        };
455        let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
456            Ok(ctx) => ctx,
457            Err(err) => {
458                tracing::warn!(error = %err, "invalid tenant context for state.get");
459                return Ok(Err(types::IfaceError::InvalidArg));
460            }
461        };
462        let key = StoreStateKey::from(key);
463        match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
464            Ok(Some(value)) => {
465                let result = if let Some(text) = value.as_str() {
466                    text.to_string()
467                } else {
468                    serde_json::to_string(&value).unwrap_or_else(|_| value.to_string())
469                };
470                Ok(Ok(result))
471            }
472            Ok(None) => Ok(Err(types::IfaceError::NotFound)),
473            Err(err) => {
474                tracing::warn!(error = %err, "state.get failed");
475                Ok(Err(types::IfaceError::Internal))
476            }
477        }
478    }
479
480    fn state_set(
481        &mut self,
482        key: iface_types::StateKey,
483        value_json: String,
484        ctx: Option<types::TenantCtx>,
485    ) -> WasmResult<Result<state::OpAck, types::IfaceError>> {
486        let store = match self.state_store_handle() {
487            Ok(store) => store,
488            Err(err) => return Ok(Err(err)),
489        };
490        let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
491            Ok(ctx) => ctx,
492            Err(err) => {
493                tracing::warn!(error = %err, "invalid tenant context for state.set");
494                return Ok(Err(types::IfaceError::InvalidArg));
495            }
496        };
497        let key = StoreStateKey::from(key);
498        let value = serde_json::from_str(&value_json).unwrap_or(Value::String(value_json));
499        match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
500            Ok(()) => Ok(Ok(state::OpAck::Ok)),
501            Err(err) => {
502                tracing::warn!(error = %err, "state.set failed");
503                Ok(Err(types::IfaceError::Internal))
504            }
505        }
506    }
507
508    fn session_update(
509        &mut self,
510        cursor: iface_types::SessionCursor,
511        ctx: Option<types::TenantCtx>,
512    ) -> WasmResult<Result<String, types::IfaceError>> {
513        let store = match self.session_store_handle() {
514            Ok(store) => store,
515            Err(err) => return Ok(Err(err)),
516        };
517        let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
518            Ok(ctx) => ctx,
519            Err(err) => {
520                tracing::warn!(error = %err, "invalid tenant context for session.update");
521                return Ok(Err(types::IfaceError::InvalidArg));
522            }
523        };
524        let user = match Self::ensure_user(&tenant_ctx) {
525            Ok(user) => user,
526            Err(err) => return Ok(Err(err)),
527        };
528        let flow_id = match Self::ensure_flow(&tenant_ctx) {
529            Ok(flow) => flow,
530            Err(err) => return Ok(Err(err)),
531        };
532        let cursor = Self::cursor_from_iface(cursor);
533        let payload = SessionData {
534            tenant_ctx: tenant_ctx.clone(),
535            flow_id,
536            cursor: cursor.clone(),
537            context_json: serde_json::json!({
538                "node_pointer": cursor.node_pointer,
539                "wait_reason": cursor.wait_reason,
540                "outbox_marker": cursor.outbox_marker,
541            })
542            .to_string(),
543        };
544        if let Some(existing) = tenant_ctx.session_id() {
545            let key = StoreSessionKey::from(existing.to_string());
546            if let Err(err) = store.update_session(&key, payload) {
547                tracing::error!(error = %err, "failed to update session snapshot");
548                return Ok(Err(types::IfaceError::Internal));
549            }
550            return Ok(Ok(existing.to_string()));
551        }
552        match store.find_by_user(&tenant_ctx, &user) {
553            Ok(Some((key, _))) => {
554                if let Err(err) = store.update_session(&key, payload) {
555                    tracing::error!(error = %err, "failed to update existing user session");
556                    return Ok(Err(types::IfaceError::Internal));
557                }
558                return Ok(Ok(key.to_string()));
559            }
560            Ok(None) => {}
561            Err(err) => {
562                tracing::error!(error = %err, "session lookup failed");
563                return Ok(Err(types::IfaceError::Internal));
564            }
565        }
566        let key = match store.create_session(&tenant_ctx, payload.clone()) {
567            Ok(key) => key,
568            Err(err) => {
569                tracing::error!(error = %err, "failed to create session");
570                return Ok(Err(types::IfaceError::Internal));
571            }
572        };
573        let ctx_with_session = tenant_ctx.with_session(key.to_string());
574        let updated_payload = SessionData {
575            tenant_ctx: ctx_with_session.clone(),
576            ..payload
577        };
578        if let Err(err) = store.update_session(&key, updated_payload) {
579            tracing::warn!(error = %err, "failed to stamp session id after create");
580        }
581        Ok(Ok(key.to_string()))
582    }
583}
584
585impl greentic_interfaces::host_import_v0_2::HostImports for HostState {
586    fn secrets_get(
587        &mut self,
588        key: String,
589        _ctx: Option<LegacyTenantCtx>,
590    ) -> WasmResult<Result<String, LegacyIfaceError>> {
591        Ok(self.get_secret(&key).map_err(|err| {
592            tracing::warn!(secret = %key, error = %err, "secret lookup denied");
593            LegacyIfaceError::Denied
594        }))
595    }
596
597    fn telemetry_emit(
598        &mut self,
599        span_json: String,
600        _ctx: Option<LegacyTenantCtx>,
601    ) -> WasmResult<()> {
602        if let Some(mock) = &self.mocks
603            && mock.telemetry_drain(&[("span_json", span_json.as_str())])
604        {
605            return Ok(());
606        }
607        tracing::info!(span = %span_json, "telemetry emit from pack");
608        Ok(())
609    }
610
611    fn tool_invoke(
612        &mut self,
613        tool: String,
614        action: String,
615        args_json: String,
616        ctx: Option<LegacyTenantCtx>,
617    ) -> WasmResult<Result<String, LegacyIfaceError>> {
618        #[cfg(not(feature = "mcp"))]
619        {
620            let _ = (tool, action, args_json, ctx);
621            tracing::warn!("tool invoke requested but crate built without `mcp` feature");
622            return Ok(Err(LegacyIfaceError::Unavailable));
623        }
624
625        #[cfg(feature = "mcp")]
626        {
627            let exec_config = match &self.exec_config {
628                Some(cfg) => cfg.clone(),
629                None => {
630                    tracing::warn!(%tool, %action, "exec config unavailable for tool invoke");
631                    return Ok(Err(LegacyIfaceError::Unavailable));
632                }
633            };
634
635            let args: Value = match serde_json::from_str(&args_json) {
636                Ok(value) => value,
637                Err(err) => {
638                    tracing::warn!(error = %err, "invalid args for tool invoke");
639                    return Ok(Err(LegacyIfaceError::InvalidArg));
640                }
641            };
642
643            let tenant = ctx.map(|ctx| map_legacy_tenant_ctx(ctx, &self.default_env));
644
645            let request = ExecRequest {
646                component: tool.clone(),
647                action: action.clone(),
648                args,
649                tenant,
650            };
651
652            if let Some(mock) = &self.mocks
653                && let Some(result) = mock.tool_short_circuit(&tool, &action)
654            {
655                return match result.and_then(|value| {
656                    serde_json::to_string(&value)
657                        .map_err(|err| anyhow!("failed to serialise mock tool output: {err}"))
658                }) {
659                    Ok(body) => Ok(Ok(body)),
660                    Err(err) => {
661                        tracing::error!(error = %err, "mock tool execution failed");
662                        Ok(Err(LegacyIfaceError::Internal))
663                    }
664                };
665            }
666
667            match greentic_mcp::exec(request, &exec_config) {
668                Ok(value) => match serde_json::to_string(&value) {
669                    Ok(body) => Ok(Ok(body)),
670                    Err(err) => {
671                        tracing::error!(error = %err, "failed to serialise tool result");
672                        Ok(Err(LegacyIfaceError::Internal))
673                    }
674                },
675                Err(err) => {
676                    tracing::warn!(%tool, %action, error = %err, "tool invoke failed");
677                    let iface_err = match err {
678                        ExecError::NotFound { .. } => LegacyIfaceError::NotFound,
679                        ExecError::Tool { .. } => LegacyIfaceError::Denied,
680                        _ => LegacyIfaceError::Unavailable,
681                    };
682                    Ok(Err(iface_err))
683                }
684            }
685        }
686    }
687
688    fn http_fetch(
689        &mut self,
690        req: LegacyHttpRequest,
691        _ctx: Option<LegacyTenantCtx>,
692    ) -> WasmResult<Result<LegacyHttpResponse, LegacyIfaceError>> {
693        if !self.config.http_enabled {
694            tracing::warn!(url = %req.url, "http fetch denied by policy");
695            return Ok(Err(LegacyIfaceError::Denied));
696        }
697
698        let mut mock_state = None;
699        let raw_body = req.body.clone();
700        if let Some(mock) = &self.mocks
701            && let Ok(meta) = HttpMockRequest::new(
702                &req.method,
703                &req.url,
704                raw_body.as_deref().map(|body| body.as_bytes()),
705            )
706        {
707            match mock.http_begin(&meta) {
708                HttpDecision::Mock(response) => {
709                    let http = LegacyHttpResponse::from(&response);
710                    return Ok(Ok(http));
711                }
712                HttpDecision::Deny(reason) => {
713                    tracing::warn!(url = %req.url, reason = %reason, "http fetch blocked by mocks");
714                    return Ok(Err(LegacyIfaceError::Denied));
715                }
716                HttpDecision::Passthrough { record } => {
717                    mock_state = Some((meta, record));
718                }
719            }
720        }
721
722        let method = req.method.parse().unwrap_or(reqwest::Method::GET);
723        let mut builder = self.http_client.request(method, &req.url);
724
725        if let Some(headers_json) = req.headers_json.as_ref() {
726            match serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(headers_json) {
727                Ok(map) => {
728                    for (key, value) in map {
729                        if let Some(val) = value.as_str()
730                            && let Ok(header) =
731                                reqwest::header::HeaderName::from_bytes(key.as_bytes())
732                            && let Ok(header_value) = reqwest::header::HeaderValue::from_str(val)
733                        {
734                            builder = builder.header(header, header_value);
735                        }
736                    }
737                }
738                Err(err) => {
739                    tracing::warn!(error = %err, "failed to parse headers for http.fetch");
740                }
741            }
742        }
743
744        if let Some(body) = raw_body.clone() {
745            builder = builder.body(body);
746        }
747
748        let response = match builder.send() {
749            Ok(resp) => resp,
750            Err(err) => {
751                tracing::error!(url = %req.url, error = %err, "http fetch failed");
752                return Ok(Err(LegacyIfaceError::Unavailable));
753            }
754        };
755
756        let status = response.status().as_u16();
757        let headers_map = response
758            .headers()
759            .iter()
760            .map(|(k, v)| {
761                (
762                    k.as_str().to_string(),
763                    v.to_str().unwrap_or_default().to_string(),
764                )
765            })
766            .collect::<BTreeMap<_, _>>();
767        let headers_json = serde_json::to_string(&headers_map).ok();
768        let body = response.text().ok();
769
770        if let Some((meta, true)) = mock_state.take()
771            && let Some(mock) = &self.mocks
772        {
773            let recorded = HttpMockResponse::new(status, headers_map.clone(), body.clone());
774            mock.http_record(&meta, &recorded);
775        }
776
777        Ok(Ok(LegacyHttpResponse {
778            status,
779            headers_json,
780            body,
781        }))
782    }
783}
784
785impl PackRuntime {
786    #[allow(clippy::too_many_arguments)]
787    pub async fn load(
788        path: impl AsRef<Path>,
789        config: Arc<HostConfig>,
790        mocks: Option<Arc<MockLayer>>,
791        archive_source: Option<&Path>,
792        session_store: Option<DynSessionStore>,
793        state_store: Option<DynStateStore>,
794        wasi_policy: Arc<RunnerWasiPolicy>,
795        verify_archive: bool,
796    ) -> Result<Self> {
797        let path = path.as_ref();
798        let is_component = path
799            .extension()
800            .and_then(|ext| ext.to_str())
801            .map(|ext| ext.eq_ignore_ascii_case("wasm"))
802            .unwrap_or(false);
803        if verify_archive && is_component {
804            if let Some(archive) = archive_source {
805                verify::verify_pack(archive).await?;
806                tracing::info!(
807                    pack_path = %archive.display(),
808                    "pack verification complete (archive)"
809                );
810            }
811        } else if verify_archive {
812            verify::verify_pack(path).await?;
813            tracing::info!(pack_path = %path.display(), "pack verification complete");
814        }
815        let engine = Engine::default();
816        let wasm_bytes = fs::read(path).await?;
817        let mut metadata =
818            PackMetadata::from_wasm(&wasm_bytes).unwrap_or_else(|| PackMetadata::fallback(path));
819        let mut archive = None;
820        let component = match Component::from_file(&engine, path) {
821            Ok(component) => Some(component),
822            Err(err) => {
823                if let Some(archive_path) = archive_source {
824                    tracing::warn!(
825                        error = %err,
826                        pack = %archive_path.display(),
827                        "component load failed, using manifest archive"
828                    );
829                    let archive_data = ArchiveFlows::from_archive(archive_path)?;
830                    metadata = archive_data.metadata.clone();
831                    archive = Some(archive_data);
832                    None
833                } else {
834                    return Err(err);
835                }
836            }
837        };
838        Ok(Self {
839            path: path.to_path_buf(),
840            config,
841            engine,
842            component,
843            metadata,
844            mocks,
845            archive,
846            session_store,
847            state_store,
848            wasi_policy,
849        })
850    }
851
852    pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
853        if let Some(archive) = &self.archive {
854            return Ok(archive.descriptors.clone());
855        }
856        tracing::trace!(
857            tenant = %self.config.tenant,
858            pack_path = %self.path.display(),
859            "listing flows from pack"
860        );
861        let component = self
862            .component
863            .as_ref()
864            .ok_or_else(|| anyhow!("pack component unavailable"))?;
865        let host_state = HostState::new(
866            Arc::clone(&self.config),
867            self.mocks.clone(),
868            self.session_store.clone(),
869            self.state_store.clone(),
870        )?;
871        let mut store = Store::new(
872            &self.engine,
873            ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?,
874        );
875        let mut linker = Linker::new(&self.engine);
876        imports::register_all(&mut linker)?;
877        let bindings = pack_export_v0_2::PackExports::instantiate(&mut store, component, &linker)?;
878        let exports = bindings.greentic_pack_export_exports();
879        let flows_raw = match exports.call_list_flows(&mut store)? {
880            Ok(flows) => flows,
881            Err(err) => {
882                bail!("pack list_flows failed: {err:?}");
883            }
884        };
885        let flows = flows_raw
886            .into_iter()
887            .map(|flow: FlowInfo| {
888                let profile = flow.profile.clone();
889                let version = flow.version.clone();
890                FlowDescriptor {
891                    id: flow.id,
892                    flow_type: flow.flow_type,
893                    profile,
894                    version,
895                    description: Some(format!("{}@{}", flow.profile, flow.version)),
896                }
897            })
898            .collect();
899        Ok(flows)
900    }
901
902    #[allow(dead_code)]
903    pub async fn run_flow(
904        &self,
905        _flow_id: &str,
906        _input: serde_json::Value,
907    ) -> Result<serde_json::Value> {
908        // TODO: dispatch flow execution via Wasmtime
909        Ok(serde_json::json!({}))
910    }
911
912    pub fn load_flow_ir(&self, flow_id: &str) -> Result<greentic_flow::ir::FlowIR> {
913        if let Some(archive) = &self.archive {
914            return archive
915                .flows
916                .get(flow_id)
917                .cloned()
918                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in archive"));
919        }
920        let component = self
921            .component
922            .as_ref()
923            .ok_or_else(|| anyhow!("pack component unavailable"))?;
924        let host_state = HostState::new(
925            Arc::clone(&self.config),
926            self.mocks.clone(),
927            self.session_store.clone(),
928            self.state_store.clone(),
929        )?;
930        let mut store = Store::new(
931            &self.engine,
932            ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?,
933        );
934        let mut linker = Linker::new(&self.engine);
935        imports::register_all(&mut linker)?;
936        let bindings = pack_export_v0_2::PackExports::instantiate(&mut store, component, &linker)?;
937        let exports = bindings.greentic_pack_export_exports();
938        let flow_name = flow_id.to_string();
939        let metadata = match exports.call_flow_metadata(&mut store, &flow_name)? {
940            Ok(doc) => doc,
941            Err(err) => bail!("pack flow_metadata({flow_id}) failed: {err:?}"),
942        };
943        let flow_doc: greentic_flow::model::FlowDoc = serde_json::from_str(&metadata)
944            .or_else(|_| serde_yaml::from_str(&metadata))
945            .with_context(|| format!("failed to parse flow metadata for {flow_id}"))?;
946        let ir = greentic_flow::to_ir(flow_doc)?;
947        Ok(ir)
948    }
949
950    pub fn metadata(&self) -> &PackMetadata {
951        &self.metadata
952    }
953}
954
955#[cfg(feature = "mcp")]
956fn map_legacy_tenant_ctx(ctx: LegacyTenantCtx, default_env: &str) -> TypesTenantCtx {
957    let env = ctx
958        .deployment
959        .runtime
960        .unwrap_or_else(|| default_env.to_string());
961
962    let env_id = EnvId::from_str(env.as_str()).expect("invalid env id");
963    let tenant_id = TenantId::from_str(ctx.tenant.as_str()).expect("invalid tenant id");
964    let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
965    tenant_ctx = tenant_ctx.with_team(
966        ctx.team
967            .as_ref()
968            .and_then(|team| TeamId::from_str(team.as_str()).ok()),
969    );
970    tenant_ctx = tenant_ctx.with_user(
971        ctx.user
972            .as_ref()
973            .and_then(|user| UserId::from_str(user.as_str()).ok()),
974    );
975    tenant_ctx.trace_id = ctx.trace_id;
976    tenant_ctx
977}
978
979fn map_legacy_error(err: LegacyIfaceError) -> types::IfaceError {
980    match err {
981        LegacyIfaceError::InvalidArg => types::IfaceError::InvalidArg,
982        LegacyIfaceError::NotFound => types::IfaceError::NotFound,
983        LegacyIfaceError::Denied => types::IfaceError::Denied,
984        LegacyIfaceError::Unavailable => types::IfaceError::Unavailable,
985        LegacyIfaceError::Internal => types::IfaceError::Internal,
986    }
987}
988
989struct ArchiveFlows {
990    descriptors: Vec<FlowDescriptor>,
991    flows: HashMap<String, FlowIR>,
992    metadata: PackMetadata,
993}
994
995impl ArchiveFlows {
996    fn from_archive(path: &Path) -> Result<Self> {
997        let file = File::open(path)
998            .with_context(|| format!("failed to open archive {}", path.display()))?;
999        let mut archive = ZipArchive::new(file)
1000            .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1001        let manifest_bytes = read_entry(&mut archive, "manifest.cbor")?;
1002        let manifest: greentic_pack::builder::PackManifest =
1003            serde_cbor::from_slice(&manifest_bytes).context("manifest.cbor is invalid")?;
1004
1005        let mut flows = HashMap::new();
1006        let mut descriptors = Vec::new();
1007        for flow in &manifest.flows {
1008            let ir = build_stub_flow_ir(&flow.id, &flow.kind);
1009            flows.insert(flow.id.clone(), ir);
1010            descriptors.push(FlowDescriptor {
1011                id: flow.id.clone(),
1012                flow_type: flow.kind.clone(),
1013                profile: manifest.meta.name.clone(),
1014                version: manifest.meta.version.to_string(),
1015                description: Some(flow.kind.clone()),
1016            });
1017        }
1018
1019        Ok(Self {
1020            metadata: PackMetadata::from_manifest(&manifest),
1021            descriptors,
1022            flows,
1023        })
1024    }
1025}
1026
1027fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1028    let mut file = archive
1029        .by_name(name)
1030        .with_context(|| format!("entry {name} missing from archive"))?;
1031    let mut buf = Vec::new();
1032    file.read_to_end(&mut buf)?;
1033    Ok(buf)
1034}
1035
1036fn build_stub_flow_ir(flow_id: &str, flow_type: &str) -> FlowIR {
1037    let mut nodes = IndexMap::new();
1038    nodes.insert(
1039        "complete".into(),
1040        NodeIR {
1041            component: "qa.process".into(),
1042            payload_expr: json!({
1043                "status": "done",
1044                "flow_id": flow_id,
1045            }),
1046            routes: vec![RouteIR {
1047                to: None,
1048                out: true,
1049            }],
1050        },
1051    );
1052    FlowIR {
1053        id: flow_id.to_string(),
1054        flow_type: flow_type.to_string(),
1055        start: Some("complete".into()),
1056        parameters: Value::Object(Default::default()),
1057        nodes,
1058    }
1059}
1060
1061#[derive(Clone, Debug, Default, Serialize, Deserialize)]
1062pub struct PackMetadata {
1063    pub pack_id: String,
1064    pub version: String,
1065    #[serde(default)]
1066    pub entry_flows: Vec<String>,
1067}
1068
1069impl PackMetadata {
1070    fn from_wasm(bytes: &[u8]) -> Option<Self> {
1071        let parser = Parser::new(0);
1072        for payload in parser.parse_all(bytes) {
1073            let payload = payload.ok()?;
1074            match payload {
1075                Payload::CustomSection(section) => {
1076                    if section.name() == "greentic.manifest"
1077                        && let Ok(meta) = Self::from_bytes(section.data())
1078                    {
1079                        return Some(meta);
1080                    }
1081                }
1082                Payload::DataSection(reader) => {
1083                    for segment in reader.into_iter().flatten() {
1084                        if let Ok(meta) = Self::from_bytes(segment.data) {
1085                            return Some(meta);
1086                        }
1087                    }
1088                }
1089                _ => {}
1090            }
1091        }
1092        None
1093    }
1094
1095    fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
1096        #[derive(Deserialize)]
1097        struct RawManifest {
1098            pack_id: String,
1099            version: String,
1100            #[serde(default)]
1101            entry_flows: Vec<String>,
1102            #[serde(default)]
1103            flows: Vec<RawFlow>,
1104        }
1105
1106        #[derive(Deserialize)]
1107        struct RawFlow {
1108            id: String,
1109        }
1110
1111        let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
1112        let mut entry_flows = if manifest.entry_flows.is_empty() {
1113            manifest.flows.iter().map(|f| f.id.clone()).collect()
1114        } else {
1115            manifest.entry_flows.clone()
1116        };
1117        entry_flows.retain(|id| !id.is_empty());
1118        Ok(Self {
1119            pack_id: manifest.pack_id,
1120            version: manifest.version,
1121            entry_flows,
1122        })
1123    }
1124
1125    pub fn fallback(path: &Path) -> Self {
1126        let pack_id = path
1127            .file_stem()
1128            .map(|s| s.to_string_lossy().into_owned())
1129            .unwrap_or_else(|| "unknown-pack".to_string());
1130        Self {
1131            pack_id,
1132            version: "0.0.0".to_string(),
1133            entry_flows: Vec::new(),
1134        }
1135    }
1136
1137    pub fn from_manifest(manifest: &greentic_pack::builder::PackManifest) -> Self {
1138        let entry_flows = if manifest.meta.entry_flows.is_empty() {
1139            manifest
1140                .flows
1141                .iter()
1142                .map(|flow| flow.id.clone())
1143                .collect::<Vec<_>>()
1144        } else {
1145            manifest.meta.entry_flows.clone()
1146        };
1147        Self {
1148            pack_id: manifest.meta.pack_id.clone(),
1149            version: manifest.meta.version.to_string(),
1150            entry_flows,
1151        }
1152    }
1153}
1154
1155impl From<&HttpMockResponse> for LegacyHttpResponse {
1156    fn from(value: &HttpMockResponse) -> Self {
1157        let headers_json = serde_json::to_string(&value.headers).ok();
1158        Self {
1159            status: value.status,
1160            headers_json,
1161            body: value.body.clone(),
1162        }
1163    }
1164}