greentic_runner_host/
pack.rs

1use std::collections::{BTreeMap, HashMap};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7
8use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
9use crate::runtime_wasmtime::{Component, Engine, ResourceTable, WasmResult};
10use anyhow::{Context, Result, anyhow, bail};
11use greentic_flow::ir::{FlowIR, NodeIR, RouteIR};
12use greentic_interfaces_host::host_import::v0_2 as host_import_v0_2;
13use greentic_interfaces_host::host_import::v0_2::greentic::host_import::imports::{
14    HttpRequest as LegacyHttpRequest, HttpResponse as LegacyHttpResponse,
15    IfaceError as LegacyIfaceError, TenantCtx as LegacyTenantCtx,
16};
17use greentic_interfaces_host::host_import::v0_6::{
18    self as host_import_v0_6, iface_types, state, types,
19};
20#[cfg(feature = "mcp")]
21use greentic_mcp::{ExecConfig, ExecError, ExecRequest};
22use greentic_pack::reader::{SigningPolicy, open_pack};
23use greentic_session::SessionKey as StoreSessionKey;
24use greentic_types::{
25    EnvId, FlowId, SessionCursor as StoreSessionCursor, SessionData, StateKey as StoreStateKey,
26    TeamId, TenantCtx as TypesTenantCtx, TenantId, UserId,
27};
28use indexmap::IndexMap;
29use reqwest::blocking::Client as BlockingClient;
30use serde::{Deserialize, Serialize};
31use serde_cbor;
32use serde_json::{self, Value};
33use serde_yaml_bw as serde_yaml;
34use tokio::fs;
35use wasmparser::{Parser, Payload};
36use zip::ZipArchive;
37
38use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
39
40use crate::config::HostConfig;
41use crate::secrets::{DynSecretsManager, read_secret_blocking};
42use crate::storage::state::STATE_PREFIX;
43use crate::storage::{DynSessionStore, DynStateStore};
44use crate::verify;
45use crate::wasi::RunnerWasiPolicy;
46use tracing::warn;
47use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
48
49#[allow(dead_code)]
50pub struct PackRuntime {
51    path: PathBuf,
52    config: Arc<HostConfig>,
53    engine: Engine,
54    component: Option<Component>,
55    metadata: PackMetadata,
56    mocks: Option<Arc<MockLayer>>,
57    flows: Option<PackFlows>,
58    session_store: Option<DynSessionStore>,
59    state_store: Option<DynStateStore>,
60    wasi_policy: Arc<RunnerWasiPolicy>,
61    secrets: DynSecretsManager,
62    oauth_config: Option<OAuthBrokerConfig>,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct FlowDescriptor {
67    pub id: String,
68    #[serde(rename = "type")]
69    pub flow_type: String,
70    pub profile: String,
71    pub version: String,
72    #[serde(default)]
73    pub description: Option<String>,
74}
75
76pub struct HostState {
77    config: Arc<HostConfig>,
78    http_client: BlockingClient,
79    #[cfg(feature = "mcp")]
80    exec_config: Option<ExecConfig>,
81    default_env: String,
82    session_store: Option<DynSessionStore>,
83    state_store: Option<DynStateStore>,
84    mocks: Option<Arc<MockLayer>>,
85    secrets: DynSecretsManager,
86    oauth_config: Option<OAuthBrokerConfig>,
87    oauth_host: OAuthBrokerHost,
88}
89
90impl HostState {
91    #[allow(clippy::default_constructed_unit_structs)]
92    pub fn new(
93        config: Arc<HostConfig>,
94        mocks: Option<Arc<MockLayer>>,
95        session_store: Option<DynSessionStore>,
96        state_store: Option<DynStateStore>,
97        secrets: DynSecretsManager,
98        oauth_config: Option<OAuthBrokerConfig>,
99    ) -> Result<Self> {
100        let http_client = BlockingClient::builder().build()?;
101        #[cfg(feature = "mcp")]
102        let exec_config = config.mcp_exec_config().ok();
103        let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
104        Ok(Self {
105            config,
106            http_client,
107            #[cfg(feature = "mcp")]
108            exec_config,
109            default_env,
110            session_store,
111            state_store,
112            mocks,
113            secrets,
114            oauth_config,
115            oauth_host: OAuthBrokerHost::default(),
116        })
117    }
118
119    pub fn get_secret(&self, key: &str) -> Result<String> {
120        if !self.config.secrets_policy.is_allowed(key) {
121            bail!("secret {key} is not permitted by bindings policy");
122        }
123        if let Some(mock) = &self.mocks
124            && let Some(value) = mock.secrets_lookup(key)
125        {
126            return Ok(value);
127        }
128        let bytes = read_secret_blocking(&self.secrets, key)
129            .context("failed to read secret from manager")?;
130        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
131        Ok(value)
132    }
133
134    fn tenant_ctx_from_v6(&self, ctx: Option<types::TenantCtx>) -> Result<TypesTenantCtx> {
135        let tenant_raw = ctx
136            .as_ref()
137            .map(|ctx| ctx.tenant.clone())
138            .unwrap_or_else(|| self.config.tenant.clone());
139        let tenant_id = TenantId::from_str(&tenant_raw)
140            .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
141        let env_id = EnvId::from_str(&self.default_env)
142            .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
143        let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
144        if let Some(ctx) = ctx {
145            if let Some(team) = ctx.team {
146                let team_id =
147                    TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
148                tenant_ctx = tenant_ctx.with_team(Some(team_id));
149            }
150            if let Some(user) = ctx.user {
151                let user_id =
152                    UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
153                tenant_ctx = tenant_ctx.with_user(Some(user_id));
154            }
155            if let Some(flow) = ctx.flow_id {
156                tenant_ctx = tenant_ctx.with_flow(flow);
157            }
158            if let Some(node) = ctx.node_id {
159                tenant_ctx = tenant_ctx.with_node(node);
160            }
161            if let Some(provider) = ctx.provider_id {
162                tenant_ctx = tenant_ctx.with_provider(provider);
163            }
164            if let Some(session) = ctx.session_id {
165                tenant_ctx = tenant_ctx.with_session(session);
166            }
167            tenant_ctx.trace_id = ctx.trace_id;
168        }
169        Ok(tenant_ctx)
170    }
171
172    fn session_store_handle(&self) -> Result<DynSessionStore, types::IfaceError> {
173        self.session_store
174            .as_ref()
175            .cloned()
176            .ok_or(types::IfaceError::Unavailable)
177    }
178
179    fn state_store_handle(&self) -> Result<DynStateStore, types::IfaceError> {
180        self.state_store
181            .as_ref()
182            .cloned()
183            .ok_or(types::IfaceError::Unavailable)
184    }
185
186    fn ensure_user(ctx: &TypesTenantCtx) -> Result<UserId, types::IfaceError> {
187        ctx.user_id
188            .clone()
189            .or_else(|| ctx.user.clone())
190            .ok_or(types::IfaceError::InvalidArg)
191    }
192
193    fn ensure_flow(ctx: &TypesTenantCtx) -> Result<FlowId, types::IfaceError> {
194        let flow = ctx.flow_id().ok_or(types::IfaceError::InvalidArg)?;
195        FlowId::from_str(flow).map_err(|_| types::IfaceError::InvalidArg)
196    }
197
198    fn cursor_from_iface(cursor: iface_types::SessionCursor) -> StoreSessionCursor {
199        let mut store_cursor = StoreSessionCursor::new(cursor.node_pointer);
200        if let Some(reason) = cursor.wait_reason {
201            store_cursor = store_cursor.with_wait_reason(reason);
202        }
203        if let Some(marker) = cursor.outbox_marker {
204            store_cursor = store_cursor.with_outbox_marker(marker);
205        }
206        store_cursor
207    }
208}
209
210pub struct ComponentState {
211    host: HostState,
212    wasi_ctx: WasiCtx,
213    resource_table: ResourceTable,
214}
215
216impl ComponentState {
217    pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
218        let wasi_ctx = policy
219            .instantiate()
220            .context("failed to build WASI context")?;
221        Ok(Self {
222            host,
223            wasi_ctx,
224            resource_table: ResourceTable::new(),
225        })
226    }
227
228    fn host_mut(&mut self) -> &mut HostState {
229        &mut self.host
230    }
231}
232
233impl OAuthHostContext for ComponentState {
234    fn tenant_id(&self) -> &str {
235        &self.host.config.tenant
236    }
237
238    fn env(&self) -> &str {
239        &self.host.default_env
240    }
241
242    fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
243        &mut self.host.oauth_host
244    }
245
246    fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
247        self.host.oauth_config.as_ref()
248    }
249}
250
251impl WasiView for ComponentState {
252    fn ctx(&mut self) -> WasiCtxView<'_> {
253        WasiCtxView {
254            ctx: &mut self.wasi_ctx,
255            table: &mut self.resource_table,
256        }
257    }
258}
259
260#[allow(unsafe_code)]
261unsafe impl Send for ComponentState {}
262#[allow(unsafe_code)]
263unsafe impl Sync for ComponentState {}
264
265impl host_import_v0_6::HostImports for ComponentState {
266    fn secrets_get(
267        &mut self,
268        key: String,
269        ctx: Option<types::TenantCtx>,
270    ) -> WasmResult<Result<String, types::IfaceError>> {
271        host_import_v0_6::HostImports::secrets_get(self.host_mut(), key, ctx)
272    }
273
274    fn telemetry_emit(
275        &mut self,
276        span_json: String,
277        ctx: Option<types::TenantCtx>,
278    ) -> WasmResult<()> {
279        host_import_v0_6::HostImports::telemetry_emit(self.host_mut(), span_json, ctx)
280    }
281
282    fn http_fetch(
283        &mut self,
284        req: host_import_v0_6::http::HttpRequest,
285        ctx: Option<types::TenantCtx>,
286    ) -> WasmResult<Result<host_import_v0_6::http::HttpResponse, types::IfaceError>> {
287        host_import_v0_6::HostImports::http_fetch(self.host_mut(), req, ctx)
288    }
289
290    fn mcp_exec(
291        &mut self,
292        component: String,
293        action: String,
294        args_json: String,
295        ctx: Option<types::TenantCtx>,
296    ) -> WasmResult<Result<String, types::IfaceError>> {
297        host_import_v0_6::HostImports::mcp_exec(self.host_mut(), component, action, args_json, ctx)
298    }
299
300    fn state_get(
301        &mut self,
302        key: iface_types::StateKey,
303        ctx: Option<types::TenantCtx>,
304    ) -> WasmResult<Result<String, types::IfaceError>> {
305        host_import_v0_6::HostImports::state_get(self.host_mut(), key, ctx)
306    }
307
308    fn state_set(
309        &mut self,
310        key: iface_types::StateKey,
311        value_json: String,
312        ctx: Option<types::TenantCtx>,
313    ) -> WasmResult<Result<state::OpAck, types::IfaceError>> {
314        host_import_v0_6::HostImports::state_set(self.host_mut(), key, value_json, ctx)
315    }
316
317    fn session_update(
318        &mut self,
319        cursor: iface_types::SessionCursor,
320        ctx: Option<types::TenantCtx>,
321    ) -> WasmResult<Result<String, types::IfaceError>> {
322        host_import_v0_6::HostImports::session_update(self.host_mut(), cursor, ctx)
323    }
324}
325
326impl host_import_v0_2::HostImports for ComponentState {
327    fn secrets_get(
328        &mut self,
329        key: String,
330        ctx: Option<LegacyTenantCtx>,
331    ) -> WasmResult<Result<String, LegacyIfaceError>> {
332        host_import_v0_2::HostImports::secrets_get(self.host_mut(), key, ctx)
333    }
334
335    fn telemetry_emit(
336        &mut self,
337        span_json: String,
338        ctx: Option<LegacyTenantCtx>,
339    ) -> WasmResult<()> {
340        host_import_v0_2::HostImports::telemetry_emit(self.host_mut(), span_json, ctx)
341    }
342
343    fn tool_invoke(
344        &mut self,
345        tool: String,
346        action: String,
347        args_json: String,
348        ctx: Option<LegacyTenantCtx>,
349    ) -> WasmResult<Result<String, LegacyIfaceError>> {
350        host_import_v0_2::HostImports::tool_invoke(self.host_mut(), tool, action, args_json, ctx)
351    }
352
353    fn http_fetch(
354        &mut self,
355        req: host_import_v0_2::greentic::host_import::imports::HttpRequest,
356        ctx: Option<host_import_v0_2::greentic::host_import::imports::TenantCtx>,
357    ) -> WasmResult<
358        Result<
359            host_import_v0_2::greentic::host_import::imports::HttpResponse,
360            host_import_v0_2::greentic::host_import::imports::IfaceError,
361        >,
362    > {
363        host_import_v0_2::HostImports::http_fetch(self.host_mut(), req, ctx)
364    }
365}
366
367impl host_import_v0_6::HostImports for HostState {
368    fn secrets_get(
369        &mut self,
370        key: String,
371        _ctx: Option<types::TenantCtx>,
372    ) -> WasmResult<Result<String, types::IfaceError>> {
373        Ok(self.get_secret(&key).map_err(|err| {
374            tracing::warn!(secret = %key, error = %err, "secret lookup denied");
375            types::IfaceError::Denied
376        }))
377    }
378
379    fn telemetry_emit(
380        &mut self,
381        span_json: String,
382        _ctx: Option<types::TenantCtx>,
383    ) -> WasmResult<()> {
384        if let Some(mock) = &self.mocks
385            && mock.telemetry_drain(&[("span_json", span_json.as_str())])
386        {
387            return Ok(());
388        }
389        tracing::info!(span = %span_json, "telemetry emit from pack");
390        Ok(())
391    }
392
393    fn http_fetch(
394        &mut self,
395        req: host_import_v0_6::http::HttpRequest,
396        _ctx: Option<types::TenantCtx>,
397    ) -> WasmResult<Result<host_import_v0_6::http::HttpResponse, types::IfaceError>> {
398        let legacy_req = LegacyHttpRequest {
399            method: req.method,
400            url: req.url,
401            headers_json: req.headers_json,
402            body: req.body,
403        };
404        match host_import_v0_2::HostImports::http_fetch(self, legacy_req, None)? {
405            Ok(resp) => Ok(Ok(host_import_v0_6::http::HttpResponse {
406                status: resp.status,
407                headers_json: resp.headers_json,
408                body: resp.body,
409            })),
410            Err(err) => Ok(Err(map_legacy_error(err))),
411        }
412    }
413
414    fn mcp_exec(
415        &mut self,
416        component: String,
417        action: String,
418        args_json: String,
419        ctx: Option<types::TenantCtx>,
420    ) -> WasmResult<Result<String, types::IfaceError>> {
421        #[cfg(not(feature = "mcp"))]
422        {
423            let _ = (component, action, args_json, ctx);
424            tracing::warn!("mcp.exec requested but crate built without `mcp` feature");
425            return Ok(Err(types::IfaceError::Unavailable));
426        }
427        #[cfg(feature = "mcp")]
428        {
429            let exec_config = match &self.exec_config {
430                Some(cfg) => cfg.clone(),
431                None => {
432                    tracing::warn!(component = %component, action = %action, "exec config unavailable for tool invoke");
433                    return Ok(Err(types::IfaceError::Unavailable));
434                }
435            };
436            let args: Value = match serde_json::from_str(&args_json) {
437                Ok(value) => value,
438                Err(err) => {
439                    tracing::warn!(error = %err, "invalid args for mcp.exec node");
440                    return Ok(Err(types::IfaceError::InvalidArg));
441                }
442            };
443            let tenant = match self.tenant_ctx_from_v6(ctx) {
444                Ok(ctx) => Some(ctx),
445                Err(err) => {
446                    tracing::warn!(error = %err, "failed to parse tenant context for mcp.exec");
447                    None
448                }
449            };
450            let request = ExecRequest {
451                component: component.clone(),
452                action: action.clone(),
453                args,
454                tenant,
455            };
456            match greentic_mcp::exec(request, &exec_config) {
457                Ok(value) => match serde_json::to_string(&value) {
458                    Ok(body) => Ok(Ok(body)),
459                    Err(err) => {
460                        tracing::error!(error = %err, "failed to serialise tool result");
461                        Ok(Err(types::IfaceError::Internal))
462                    }
463                },
464                Err(err) => {
465                    tracing::warn!(component = %component, action = %action, error = %err, "mcp exec failed");
466                    let iface_err = match err {
467                        ExecError::NotFound { .. } => types::IfaceError::NotFound,
468                        ExecError::Tool { .. } => types::IfaceError::Denied,
469                        _ => types::IfaceError::Unavailable,
470                    };
471                    Ok(Err(iface_err))
472                }
473            }
474        }
475    }
476
477    fn state_get(
478        &mut self,
479        key: iface_types::StateKey,
480        ctx: Option<types::TenantCtx>,
481    ) -> WasmResult<Result<String, types::IfaceError>> {
482        let store = match self.state_store_handle() {
483            Ok(store) => store,
484            Err(err) => return Ok(Err(err)),
485        };
486        let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
487            Ok(ctx) => ctx,
488            Err(err) => {
489                tracing::warn!(error = %err, "invalid tenant context for state.get");
490                return Ok(Err(types::IfaceError::InvalidArg));
491            }
492        };
493        let key = StoreStateKey::from(key);
494        match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
495            Ok(Some(value)) => {
496                let result = if let Some(text) = value.as_str() {
497                    text.to_string()
498                } else {
499                    serde_json::to_string(&value).unwrap_or_else(|_| value.to_string())
500                };
501                Ok(Ok(result))
502            }
503            Ok(None) => Ok(Err(types::IfaceError::NotFound)),
504            Err(err) => {
505                tracing::warn!(error = %err, "state.get failed");
506                Ok(Err(types::IfaceError::Internal))
507            }
508        }
509    }
510
511    fn state_set(
512        &mut self,
513        key: iface_types::StateKey,
514        value_json: String,
515        ctx: Option<types::TenantCtx>,
516    ) -> WasmResult<Result<state::OpAck, types::IfaceError>> {
517        let store = match self.state_store_handle() {
518            Ok(store) => store,
519            Err(err) => return Ok(Err(err)),
520        };
521        let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
522            Ok(ctx) => ctx,
523            Err(err) => {
524                tracing::warn!(error = %err, "invalid tenant context for state.set");
525                return Ok(Err(types::IfaceError::InvalidArg));
526            }
527        };
528        let key = StoreStateKey::from(key);
529        let value = serde_json::from_str(&value_json).unwrap_or(Value::String(value_json));
530        match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
531            Ok(()) => Ok(Ok(state::OpAck::Ok)),
532            Err(err) => {
533                tracing::warn!(error = %err, "state.set failed");
534                Ok(Err(types::IfaceError::Internal))
535            }
536        }
537    }
538
539    fn session_update(
540        &mut self,
541        cursor: iface_types::SessionCursor,
542        ctx: Option<types::TenantCtx>,
543    ) -> WasmResult<Result<String, types::IfaceError>> {
544        let store = match self.session_store_handle() {
545            Ok(store) => store,
546            Err(err) => return Ok(Err(err)),
547        };
548        let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
549            Ok(ctx) => ctx,
550            Err(err) => {
551                tracing::warn!(error = %err, "invalid tenant context for session.update");
552                return Ok(Err(types::IfaceError::InvalidArg));
553            }
554        };
555        let user = match Self::ensure_user(&tenant_ctx) {
556            Ok(user) => user,
557            Err(err) => return Ok(Err(err)),
558        };
559        let flow_id = match Self::ensure_flow(&tenant_ctx) {
560            Ok(flow) => flow,
561            Err(err) => return Ok(Err(err)),
562        };
563        let cursor = Self::cursor_from_iface(cursor);
564        let payload = SessionData {
565            tenant_ctx: tenant_ctx.clone(),
566            flow_id,
567            cursor: cursor.clone(),
568            context_json: serde_json::json!({
569                "node_pointer": cursor.node_pointer,
570                "wait_reason": cursor.wait_reason,
571                "outbox_marker": cursor.outbox_marker,
572            })
573            .to_string(),
574        };
575        if let Some(existing) = tenant_ctx.session_id() {
576            let key = StoreSessionKey::from(existing.to_string());
577            if let Err(err) = store.update_session(&key, payload) {
578                tracing::error!(error = %err, "failed to update session snapshot");
579                return Ok(Err(types::IfaceError::Internal));
580            }
581            return Ok(Ok(existing.to_string()));
582        }
583        match store.find_by_user(&tenant_ctx, &user) {
584            Ok(Some((key, _))) => {
585                if let Err(err) = store.update_session(&key, payload) {
586                    tracing::error!(error = %err, "failed to update existing user session");
587                    return Ok(Err(types::IfaceError::Internal));
588                }
589                return Ok(Ok(key.to_string()));
590            }
591            Ok(None) => {}
592            Err(err) => {
593                tracing::error!(error = %err, "session lookup failed");
594                return Ok(Err(types::IfaceError::Internal));
595            }
596        }
597        let key = match store.create_session(&tenant_ctx, payload.clone()) {
598            Ok(key) => key,
599            Err(err) => {
600                tracing::error!(error = %err, "failed to create session");
601                return Ok(Err(types::IfaceError::Internal));
602            }
603        };
604        let ctx_with_session = tenant_ctx.with_session(key.to_string());
605        let updated_payload = SessionData {
606            tenant_ctx: ctx_with_session.clone(),
607            ..payload
608        };
609        if let Err(err) = store.update_session(&key, updated_payload) {
610            tracing::warn!(error = %err, "failed to stamp session id after create");
611        }
612        Ok(Ok(key.to_string()))
613    }
614}
615
616impl host_import_v0_2::HostImports for HostState {
617    fn secrets_get(
618        &mut self,
619        key: String,
620        _ctx: Option<LegacyTenantCtx>,
621    ) -> WasmResult<Result<String, LegacyIfaceError>> {
622        Ok(self.get_secret(&key).map_err(|err| {
623            tracing::warn!(secret = %key, error = %err, "secret lookup denied");
624            LegacyIfaceError::Denied
625        }))
626    }
627
628    fn telemetry_emit(
629        &mut self,
630        span_json: String,
631        _ctx: Option<LegacyTenantCtx>,
632    ) -> WasmResult<()> {
633        if let Some(mock) = &self.mocks
634            && mock.telemetry_drain(&[("span_json", span_json.as_str())])
635        {
636            return Ok(());
637        }
638        tracing::info!(span = %span_json, "telemetry emit from pack");
639        Ok(())
640    }
641
642    fn tool_invoke(
643        &mut self,
644        tool: String,
645        action: String,
646        args_json: String,
647        ctx: Option<LegacyTenantCtx>,
648    ) -> WasmResult<Result<String, LegacyIfaceError>> {
649        #[cfg(not(feature = "mcp"))]
650        {
651            let _ = (tool, action, args_json, ctx);
652            tracing::warn!("tool invoke requested but crate built without `mcp` feature");
653            return Ok(Err(LegacyIfaceError::Unavailable));
654        }
655
656        #[cfg(feature = "mcp")]
657        {
658            let exec_config = match &self.exec_config {
659                Some(cfg) => cfg.clone(),
660                None => {
661                    tracing::warn!(%tool, %action, "exec config unavailable for tool invoke");
662                    return Ok(Err(LegacyIfaceError::Unavailable));
663                }
664            };
665
666            let args: Value = match serde_json::from_str(&args_json) {
667                Ok(value) => value,
668                Err(err) => {
669                    tracing::warn!(error = %err, "invalid args for tool invoke");
670                    return Ok(Err(LegacyIfaceError::InvalidArg));
671                }
672            };
673
674            let tenant = ctx.map(|ctx| map_legacy_tenant_ctx(ctx, &self.default_env));
675
676            let request = ExecRequest {
677                component: tool.clone(),
678                action: action.clone(),
679                args,
680                tenant,
681            };
682
683            if let Some(mock) = &self.mocks
684                && let Some(result) = mock.tool_short_circuit(&tool, &action)
685            {
686                return match result.and_then(|value| {
687                    serde_json::to_string(&value)
688                        .map_err(|err| anyhow!("failed to serialise mock tool output: {err}"))
689                }) {
690                    Ok(body) => Ok(Ok(body)),
691                    Err(err) => {
692                        tracing::error!(error = %err, "mock tool execution failed");
693                        Ok(Err(LegacyIfaceError::Internal))
694                    }
695                };
696            }
697
698            match greentic_mcp::exec(request, &exec_config) {
699                Ok(value) => match serde_json::to_string(&value) {
700                    Ok(body) => Ok(Ok(body)),
701                    Err(err) => {
702                        tracing::error!(error = %err, "failed to serialise tool result");
703                        Ok(Err(LegacyIfaceError::Internal))
704                    }
705                },
706                Err(err) => {
707                    tracing::warn!(%tool, %action, error = %err, "tool invoke failed");
708                    let iface_err = match err {
709                        ExecError::NotFound { .. } => LegacyIfaceError::NotFound,
710                        ExecError::Tool { .. } => LegacyIfaceError::Denied,
711                        _ => LegacyIfaceError::Unavailable,
712                    };
713                    Ok(Err(iface_err))
714                }
715            }
716        }
717    }
718
719    fn http_fetch(
720        &mut self,
721        req: LegacyHttpRequest,
722        _ctx: Option<LegacyTenantCtx>,
723    ) -> WasmResult<Result<LegacyHttpResponse, LegacyIfaceError>> {
724        if !self.config.http_enabled {
725            tracing::warn!(url = %req.url, "http fetch denied by policy");
726            return Ok(Err(LegacyIfaceError::Denied));
727        }
728
729        let mut mock_state = None;
730        let raw_body = req.body.clone();
731        if let Some(mock) = &self.mocks
732            && let Ok(meta) = HttpMockRequest::new(
733                &req.method,
734                &req.url,
735                raw_body.as_deref().map(|body| body.as_bytes()),
736            )
737        {
738            match mock.http_begin(&meta) {
739                HttpDecision::Mock(response) => {
740                    let http = LegacyHttpResponse::from(&response);
741                    return Ok(Ok(http));
742                }
743                HttpDecision::Deny(reason) => {
744                    tracing::warn!(url = %req.url, reason = %reason, "http fetch blocked by mocks");
745                    return Ok(Err(LegacyIfaceError::Denied));
746                }
747                HttpDecision::Passthrough { record } => {
748                    mock_state = Some((meta, record));
749                }
750            }
751        }
752
753        let method = req.method.parse().unwrap_or(reqwest::Method::GET);
754        let mut builder = self.http_client.request(method, &req.url);
755
756        if let Some(headers_json) = req.headers_json.as_ref() {
757            match serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(headers_json) {
758                Ok(map) => {
759                    for (key, value) in map {
760                        if let Some(val) = value.as_str()
761                            && let Ok(header) =
762                                reqwest::header::HeaderName::from_bytes(key.as_bytes())
763                            && let Ok(header_value) = reqwest::header::HeaderValue::from_str(val)
764                        {
765                            builder = builder.header(header, header_value);
766                        }
767                    }
768                }
769                Err(err) => {
770                    tracing::warn!(error = %err, "failed to parse headers for http.fetch");
771                }
772            }
773        }
774
775        if let Some(body) = raw_body.clone() {
776            builder = builder.body(body);
777        }
778
779        let response = match builder.send() {
780            Ok(resp) => resp,
781            Err(err) => {
782                tracing::error!(url = %req.url, error = %err, "http fetch failed");
783                return Ok(Err(LegacyIfaceError::Unavailable));
784            }
785        };
786
787        let status = response.status().as_u16();
788        let headers_map = response
789            .headers()
790            .iter()
791            .map(|(k, v)| {
792                (
793                    k.as_str().to_string(),
794                    v.to_str().unwrap_or_default().to_string(),
795                )
796            })
797            .collect::<BTreeMap<_, _>>();
798        let headers_json = serde_json::to_string(&headers_map).ok();
799        let body = response.text().ok();
800
801        if let Some((meta, true)) = mock_state.take()
802            && let Some(mock) = &self.mocks
803        {
804            let recorded = HttpMockResponse::new(status, headers_map.clone(), body.clone());
805            mock.http_record(&meta, &recorded);
806        }
807
808        Ok(Ok(LegacyHttpResponse {
809            status,
810            headers_json,
811            body,
812        }))
813    }
814}
815
816impl PackRuntime {
817    #[allow(clippy::too_many_arguments)]
818    pub async fn load(
819        path: impl AsRef<Path>,
820        config: Arc<HostConfig>,
821        mocks: Option<Arc<MockLayer>>,
822        archive_source: Option<&Path>,
823        session_store: Option<DynSessionStore>,
824        state_store: Option<DynStateStore>,
825        wasi_policy: Arc<RunnerWasiPolicy>,
826        secrets: DynSecretsManager,
827        oauth_config: Option<OAuthBrokerConfig>,
828        verify_archive: bool,
829    ) -> Result<Self> {
830        let path = path.as_ref();
831        let is_component = path
832            .extension()
833            .and_then(|ext| ext.to_str())
834            .map(|ext| ext.eq_ignore_ascii_case("wasm"))
835            .unwrap_or(false);
836        let archive_hint = archive_source.or(if is_component { None } else { Some(path) });
837        if verify_archive {
838            let verify_target = archive_hint.unwrap_or(path);
839            verify::verify_pack(verify_target).await?;
840            tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
841        }
842        let engine = Engine::default();
843        let wasm_bytes = fs::read(path).await?;
844        let mut metadata =
845            PackMetadata::from_wasm(&wasm_bytes).unwrap_or_else(|| PackMetadata::fallback(path));
846        let mut flows = if let Some(archive_path) = archive_hint {
847            match PackFlows::from_archive(archive_path) {
848                Ok(cache) => {
849                    metadata = cache.metadata.clone();
850                    Some(cache)
851                }
852                Err(err) => {
853                    warn!(
854                        error = %err,
855                        pack = %archive_path.display(),
856                        "failed to parse pack flows via greentic-pack; falling back to component exports"
857                    );
858                    None
859                }
860            }
861        } else {
862            None
863        };
864        let component = match Component::from_file(&engine, path) {
865            Ok(component) => Some(component),
866            Err(err) => {
867                if let Some(archive_path) = archive_hint {
868                    if flows.is_none() {
869                        let archive_data =
870                            PackFlows::from_archive(archive_path).with_context(|| {
871                                format!(
872                                    "failed to build flow cache from {}",
873                                    archive_path.display()
874                                )
875                            })?;
876                        metadata = archive_data.metadata.clone();
877                        flows = Some(archive_data);
878                    }
879                    warn!(
880                        error = %err,
881                        pack = %archive_path.display(),
882                        "component load failed; continuing with cached flow metadata"
883                    );
884                    None
885                } else {
886                    return Err(err);
887                }
888            }
889        };
890        Ok(Self {
891            path: path.to_path_buf(),
892            config,
893            engine,
894            component,
895            metadata,
896            mocks,
897            flows,
898            session_store,
899            state_store,
900            wasi_policy,
901            secrets,
902            oauth_config,
903        })
904    }
905
906    pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
907        if let Some(cache) = &self.flows {
908            return Ok(cache.descriptors.clone());
909        }
910        Ok(Vec::new())
911    }
912
913    #[allow(dead_code)]
914    pub async fn run_flow(
915        &self,
916        _flow_id: &str,
917        _input: serde_json::Value,
918    ) -> Result<serde_json::Value> {
919        // TODO: dispatch flow execution via Wasmtime
920        Ok(serde_json::json!({}))
921    }
922
923    pub fn load_flow_ir(&self, flow_id: &str) -> Result<greentic_flow::ir::FlowIR> {
924        if let Some(cache) = &self.flows {
925            return cache
926                .flows
927                .get(flow_id)
928                .cloned()
929                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
930        }
931        bail!("flow '{flow_id}' not available (pack exports disabled)")
932    }
933
934    pub fn metadata(&self) -> &PackMetadata {
935        &self.metadata
936    }
937}
938
939#[cfg(feature = "mcp")]
940fn map_legacy_tenant_ctx(ctx: LegacyTenantCtx, default_env: &str) -> TypesTenantCtx {
941    let env = ctx
942        .deployment
943        .runtime
944        .unwrap_or_else(|| default_env.to_string());
945
946    let env_id = EnvId::from_str(env.as_str()).expect("invalid env id");
947    let tenant_id = TenantId::from_str(ctx.tenant.as_str()).expect("invalid tenant id");
948    let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
949    tenant_ctx = tenant_ctx.with_team(
950        ctx.team
951            .as_ref()
952            .and_then(|team| TeamId::from_str(team.as_str()).ok()),
953    );
954    tenant_ctx = tenant_ctx.with_user(
955        ctx.user
956            .as_ref()
957            .and_then(|user| UserId::from_str(user.as_str()).ok()),
958    );
959    tenant_ctx.trace_id = ctx.trace_id;
960    tenant_ctx
961}
962
963fn map_legacy_error(err: LegacyIfaceError) -> types::IfaceError {
964    match err {
965        LegacyIfaceError::InvalidArg => types::IfaceError::InvalidArg,
966        LegacyIfaceError::NotFound => types::IfaceError::NotFound,
967        LegacyIfaceError::Denied => types::IfaceError::Denied,
968        LegacyIfaceError::Unavailable => types::IfaceError::Unavailable,
969        LegacyIfaceError::Internal => types::IfaceError::Internal,
970    }
971}
972
973struct PackFlows {
974    descriptors: Vec<FlowDescriptor>,
975    flows: HashMap<String, FlowIR>,
976    metadata: PackMetadata,
977}
978
979impl PackFlows {
980    fn from_archive(path: &Path) -> Result<Self> {
981        let pack = open_pack(path, SigningPolicy::DevOk)
982            .map_err(|err| anyhow!("failed to open pack {}: {}", path.display(), err.message))?;
983        let file = File::open(path)
984            .with_context(|| format!("failed to open archive {}", path.display()))?;
985        let mut archive = ZipArchive::new(file)
986            .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
987        let mut flows = HashMap::new();
988        let mut descriptors = Vec::new();
989        for flow in &pack.manifest.flows {
990            match load_flow_entry(&mut archive, flow, &pack.manifest.meta) {
991                Ok((descriptor, ir)) => {
992                    flows.insert(descriptor.id.clone(), ir);
993                    descriptors.push(descriptor);
994                }
995                Err(err) => {
996                    warn!(
997                        error = %err,
998                        flow_id = flow.id,
999                        "failed to parse flow metadata from archive; using stub flow"
1000                    );
1001                    let stub = build_stub_flow_ir(&flow.id, &flow.kind);
1002                    flows.insert(flow.id.clone(), stub.clone());
1003                    descriptors.push(FlowDescriptor {
1004                        id: flow.id.clone(),
1005                        flow_type: flow.kind.clone(),
1006                        profile: pack.manifest.meta.name.clone(),
1007                        version: pack.manifest.meta.version.to_string(),
1008                        description: Some(flow.kind.clone()),
1009                    });
1010                }
1011            }
1012        }
1013
1014        Ok(Self {
1015            metadata: PackMetadata::from_manifest(&pack.manifest),
1016            descriptors,
1017            flows,
1018        })
1019    }
1020}
1021
1022fn load_flow_entry(
1023    archive: &mut ZipArchive<File>,
1024    entry: &greentic_pack::builder::FlowEntry,
1025    meta: &greentic_pack::builder::PackMeta,
1026) -> Result<(FlowDescriptor, FlowIR)> {
1027    let doc = load_flow_doc(archive, entry)?;
1028    let ir = greentic_flow::to_ir(doc.clone())
1029        .with_context(|| format!("failed to build IR for flow {}", doc.id))?;
1030    let descriptor = FlowDescriptor {
1031        id: doc.id.clone(),
1032        flow_type: doc.flow_type.clone(),
1033        profile: meta.name.clone(),
1034        version: meta.version.to_string(),
1035        description: doc
1036            .description
1037            .clone()
1038            .or(doc.title.clone())
1039            .or_else(|| Some(entry.kind.clone())),
1040    };
1041    Ok((descriptor, ir))
1042}
1043
1044fn load_flow_doc(
1045    archive: &mut ZipArchive<File>,
1046    entry: &greentic_pack::builder::FlowEntry,
1047) -> Result<greentic_flow::model::FlowDoc> {
1048    if let Ok(bytes) = read_entry(archive, &entry.file_yaml)
1049        && let Ok(doc) = serde_yaml::from_slice(&bytes)
1050    {
1051        return Ok(doc);
1052    }
1053    let json_bytes = read_entry(archive, &entry.file_json)
1054        .with_context(|| format!("missing flow document {}", entry.file_json))?;
1055    let doc = serde_json::from_slice(&json_bytes)
1056        .with_context(|| format!("failed to parse {}", entry.file_json))?;
1057    Ok(doc)
1058}
1059
1060fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1061    let mut file = archive
1062        .by_name(name)
1063        .with_context(|| format!("entry {name} missing from archive"))?;
1064    let mut buf = Vec::new();
1065    file.read_to_end(&mut buf)?;
1066    Ok(buf)
1067}
1068
1069#[derive(Clone, Debug, Default, Serialize, Deserialize)]
1070pub struct PackMetadata {
1071    pub pack_id: String,
1072    pub version: String,
1073    #[serde(default)]
1074    pub entry_flows: Vec<String>,
1075}
1076
1077impl PackMetadata {
1078    fn from_wasm(bytes: &[u8]) -> Option<Self> {
1079        let parser = Parser::new(0);
1080        for payload in parser.parse_all(bytes) {
1081            let payload = payload.ok()?;
1082            match payload {
1083                Payload::CustomSection(section) => {
1084                    if section.name() == "greentic.manifest"
1085                        && let Ok(meta) = Self::from_bytes(section.data())
1086                    {
1087                        return Some(meta);
1088                    }
1089                }
1090                Payload::DataSection(reader) => {
1091                    for segment in reader.into_iter().flatten() {
1092                        if let Ok(meta) = Self::from_bytes(segment.data) {
1093                            return Some(meta);
1094                        }
1095                    }
1096                }
1097                _ => {}
1098            }
1099        }
1100        None
1101    }
1102
1103    fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
1104        #[derive(Deserialize)]
1105        struct RawManifest {
1106            pack_id: String,
1107            version: String,
1108            #[serde(default)]
1109            entry_flows: Vec<String>,
1110            #[serde(default)]
1111            flows: Vec<RawFlow>,
1112        }
1113
1114        #[derive(Deserialize)]
1115        struct RawFlow {
1116            id: String,
1117        }
1118
1119        let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
1120        let mut entry_flows = if manifest.entry_flows.is_empty() {
1121            manifest.flows.iter().map(|f| f.id.clone()).collect()
1122        } else {
1123            manifest.entry_flows.clone()
1124        };
1125        entry_flows.retain(|id| !id.is_empty());
1126        Ok(Self {
1127            pack_id: manifest.pack_id,
1128            version: manifest.version,
1129            entry_flows,
1130        })
1131    }
1132
1133    pub fn fallback(path: &Path) -> Self {
1134        let pack_id = path
1135            .file_stem()
1136            .map(|s| s.to_string_lossy().into_owned())
1137            .unwrap_or_else(|| "unknown-pack".to_string());
1138        Self {
1139            pack_id,
1140            version: "0.0.0".to_string(),
1141            entry_flows: Vec::new(),
1142        }
1143    }
1144
1145    pub fn from_manifest(manifest: &greentic_pack::builder::PackManifest) -> Self {
1146        let entry_flows = if manifest.meta.entry_flows.is_empty() {
1147            manifest
1148                .flows
1149                .iter()
1150                .map(|flow| flow.id.clone())
1151                .collect::<Vec<_>>()
1152        } else {
1153            manifest.meta.entry_flows.clone()
1154        };
1155        Self {
1156            pack_id: manifest.meta.pack_id.clone(),
1157            version: manifest.meta.version.to_string(),
1158            entry_flows,
1159        }
1160    }
1161}
1162
1163fn build_stub_flow_ir(flow_id: &str, flow_type: &str) -> FlowIR {
1164    let mut nodes = IndexMap::new();
1165    nodes.insert(
1166        "complete".into(),
1167        NodeIR {
1168            component: "qa.process".into(),
1169            payload_expr: serde_json::json!({
1170                "status": "done",
1171                "flow_id": flow_id,
1172            }),
1173            routes: vec![RouteIR {
1174                to: None,
1175                out: true,
1176            }],
1177        },
1178    );
1179    FlowIR {
1180        id: flow_id.to_string(),
1181        flow_type: flow_type.to_string(),
1182        start: Some("complete".into()),
1183        parameters: Value::Object(Default::default()),
1184        nodes,
1185    }
1186}
1187
1188impl From<&HttpMockResponse> for LegacyHttpResponse {
1189    fn from(value: &HttpMockResponse) -> Self {
1190        let headers_json = serde_json::to_string(&value.headers).ok();
1191        Self {
1192            status: value.status,
1193            headers_json,
1194            body: value.body.clone(),
1195        }
1196    }
1197}