greentic_runner_host/
pack.rs

1use std::collections::{BTreeMap, HashMap};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7
8use crate::component_api::component::greentic::component::control::Host as ComponentControlHost;
9use crate::component_api::{
10    ComponentPre, control, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
11};
12use crate::imports::register_all;
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::runtime_wasmtime::{Component, Engine, Linker, ResourceTable, WasmResult};
15use anyhow::{Context, Result, anyhow, bail};
16use greentic_flow::ir::{FlowIR, NodeIR, RouteIR};
17use greentic_interfaces_host::host_import::v0_2 as host_import_v0_2;
18use greentic_interfaces_host::host_import::v0_2::greentic::host_import::imports::{
19    HttpRequest as LegacyHttpRequest, HttpResponse as LegacyHttpResponse,
20    IfaceError as LegacyIfaceError, TenantCtx as LegacyTenantCtx,
21};
22use greentic_interfaces_host::host_import::v0_6::{
23    self as host_import_v0_6, iface_types, state, types,
24};
25use greentic_pack::reader::{SigningPolicy, open_pack};
26use greentic_session::SessionKey as StoreSessionKey;
27use greentic_types::{
28    EnvId, FlowId, SessionCursor as StoreSessionCursor, SessionData, StateKey as StoreStateKey,
29    TeamId, TenantCtx as TypesTenantCtx, TenantId, UserId,
30};
31use indexmap::IndexMap;
32use once_cell::sync::Lazy;
33use parking_lot::Mutex;
34use reqwest::blocking::Client as BlockingClient;
35use runner_core::normalize_under_root;
36use serde::{Deserialize, Serialize};
37use serde_cbor;
38use serde_json::{self, Value};
39use serde_yaml_bw as serde_yaml;
40use tokio::fs;
41use wasmparser::{Parser, Payload};
42use wasmtime::StoreContextMut;
43use zip::ZipArchive;
44
45use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
46
47use crate::config::HostConfig;
48use crate::secrets::{DynSecretsManager, read_secret_blocking};
49use crate::storage::state::STATE_PREFIX;
50use crate::storage::{DynSessionStore, DynStateStore};
51use crate::verify;
52use crate::wasi::RunnerWasiPolicy;
53use tracing::warn;
54use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
55
56#[allow(dead_code)]
57pub struct PackRuntime {
58    /// Component artifact path (wasm file).
59    path: PathBuf,
60    /// Optional archive (.gtpack) used to load flows/manifests.
61    archive_path: Option<PathBuf>,
62    config: Arc<HostConfig>,
63    engine: Engine,
64    metadata: PackMetadata,
65    manifest: Option<greentic_pack::builder::PackManifest>,
66    mocks: Option<Arc<MockLayer>>,
67    flows: Option<PackFlows>,
68    components: HashMap<String, PackComponent>,
69    http_client: Arc<BlockingClient>,
70    pre_cache: Mutex<HashMap<String, ComponentPre<ComponentState>>>,
71    session_store: Option<DynSessionStore>,
72    state_store: Option<DynStateStore>,
73    wasi_policy: Arc<RunnerWasiPolicy>,
74    secrets: DynSecretsManager,
75    oauth_config: Option<OAuthBrokerConfig>,
76}
77
78struct PackComponent {
79    #[allow(dead_code)]
80    name: String,
81    #[allow(dead_code)]
82    version: String,
83    component: Component,
84}
85
86fn build_blocking_client() -> BlockingClient {
87    std::thread::spawn(|| BlockingClient::builder().build().expect("blocking client"))
88        .join()
89        .expect("client build thread panicked")
90}
91
92fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
93    let (root, candidate) = if path.is_absolute() {
94        let parent = path
95            .parent()
96            .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
97        let root = parent
98            .canonicalize()
99            .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
100        let file = path
101            .file_name()
102            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
103        (root, PathBuf::from(file))
104    } else {
105        let cwd = std::env::current_dir().context("failed to resolve current directory")?;
106        let base = if let Some(parent) = path.parent() {
107            cwd.join(parent)
108        } else {
109            cwd
110        };
111        let root = base
112            .canonicalize()
113            .with_context(|| format!("failed to canonicalize {}", base.display()))?;
114        let file = path
115            .file_name()
116            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
117        (root, PathBuf::from(file))
118    };
119    let safe = normalize_under_root(&root, &candidate)?;
120    Ok((root, safe))
121}
122
123static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct FlowDescriptor {
127    pub id: String,
128    #[serde(rename = "type")]
129    pub flow_type: String,
130    pub profile: String,
131    pub version: String,
132    #[serde(default)]
133    pub description: Option<String>,
134}
135
136pub struct HostState {
137    config: Arc<HostConfig>,
138    http_client: Arc<BlockingClient>,
139    default_env: String,
140    session_store: Option<DynSessionStore>,
141    state_store: Option<DynStateStore>,
142    mocks: Option<Arc<MockLayer>>,
143    secrets: DynSecretsManager,
144    oauth_config: Option<OAuthBrokerConfig>,
145    oauth_host: OAuthBrokerHost,
146}
147
148impl HostState {
149    #[allow(clippy::default_constructed_unit_structs)]
150    pub fn new(
151        config: Arc<HostConfig>,
152        http_client: Arc<BlockingClient>,
153        mocks: Option<Arc<MockLayer>>,
154        session_store: Option<DynSessionStore>,
155        state_store: Option<DynStateStore>,
156        secrets: DynSecretsManager,
157        oauth_config: Option<OAuthBrokerConfig>,
158    ) -> Result<Self> {
159        let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
160        Ok(Self {
161            config,
162            http_client,
163            default_env,
164            session_store,
165            state_store,
166            mocks,
167            secrets,
168            oauth_config,
169            oauth_host: OAuthBrokerHost::default(),
170        })
171    }
172
173    pub fn get_secret(&self, key: &str) -> Result<String> {
174        if !self.config.secrets_policy.is_allowed(key) {
175            bail!("secret {key} is not permitted by bindings policy");
176        }
177        if let Some(mock) = &self.mocks
178            && let Some(value) = mock.secrets_lookup(key)
179        {
180            return Ok(value);
181        }
182        let bytes = read_secret_blocking(&self.secrets, key)
183            .context("failed to read secret from manager")?;
184        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
185        Ok(value)
186    }
187
188    fn tenant_ctx_from_v6(&self, ctx: Option<types::TenantCtx>) -> Result<TypesTenantCtx> {
189        let tenant_raw = ctx
190            .as_ref()
191            .map(|ctx| ctx.tenant.clone())
192            .unwrap_or_else(|| self.config.tenant.clone());
193        let tenant_id = TenantId::from_str(&tenant_raw)
194            .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
195        let env_id = EnvId::from_str(&self.default_env)
196            .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
197        let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
198        if let Some(ctx) = ctx {
199            if let Some(team) = ctx.team {
200                let team_id =
201                    TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
202                tenant_ctx = tenant_ctx.with_team(Some(team_id));
203            }
204            if let Some(user) = ctx.user {
205                let user_id =
206                    UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
207                tenant_ctx = tenant_ctx.with_user(Some(user_id));
208            }
209            if let Some(flow) = ctx.flow_id {
210                tenant_ctx = tenant_ctx.with_flow(flow);
211            }
212            if let Some(node) = ctx.node_id {
213                tenant_ctx = tenant_ctx.with_node(node);
214            }
215            if let Some(provider) = ctx.provider_id {
216                tenant_ctx = tenant_ctx.with_provider(provider);
217            }
218            if let Some(session) = ctx.session_id {
219                tenant_ctx = tenant_ctx.with_session(session);
220            }
221            tenant_ctx.trace_id = ctx.trace_id;
222        }
223        Ok(tenant_ctx)
224    }
225
226    fn session_store_handle(&self) -> Result<DynSessionStore, types::IfaceError> {
227        self.session_store
228            .as_ref()
229            .cloned()
230            .ok_or(types::IfaceError::Unavailable)
231    }
232
233    fn state_store_handle(&self) -> Result<DynStateStore, types::IfaceError> {
234        self.state_store
235            .as_ref()
236            .cloned()
237            .ok_or(types::IfaceError::Unavailable)
238    }
239
240    fn ensure_user(ctx: &TypesTenantCtx) -> Result<UserId, types::IfaceError> {
241        ctx.user_id
242            .clone()
243            .or_else(|| ctx.user.clone())
244            .ok_or(types::IfaceError::InvalidArg)
245    }
246
247    fn ensure_flow(ctx: &TypesTenantCtx) -> Result<FlowId, types::IfaceError> {
248        let flow = ctx.flow_id().ok_or(types::IfaceError::InvalidArg)?;
249        FlowId::from_str(flow).map_err(|_| types::IfaceError::InvalidArg)
250    }
251
252    fn cursor_from_iface(cursor: iface_types::SessionCursor) -> StoreSessionCursor {
253        let mut store_cursor = StoreSessionCursor::new(cursor.node_pointer);
254        if let Some(reason) = cursor.wait_reason {
255            store_cursor = store_cursor.with_wait_reason(reason);
256        }
257        if let Some(marker) = cursor.outbox_marker {
258            store_cursor = store_cursor.with_outbox_marker(marker);
259        }
260        store_cursor
261    }
262}
263
264pub struct ComponentState {
265    host: HostState,
266    wasi_ctx: WasiCtx,
267    resource_table: ResourceTable,
268}
269
270impl ComponentState {
271    pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
272        let wasi_ctx = policy
273            .instantiate()
274            .context("failed to build WASI context")?;
275        Ok(Self {
276            host,
277            wasi_ctx,
278            resource_table: ResourceTable::new(),
279        })
280    }
281
282    fn host_mut(&mut self) -> &mut HostState {
283        &mut self.host
284    }
285}
286
287impl control::Host for ComponentState {
288    fn should_cancel(&mut self) -> bool {
289        false
290    }
291
292    fn yield_now(&mut self) {
293        // no-op cooperative yield
294    }
295}
296
297fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
298    let mut inst = linker.instance("greentic:component/control@0.4.0")?;
299    inst.func_wrap(
300        "should-cancel",
301        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
302            let host = caller.data_mut();
303            Ok((ComponentControlHost::should_cancel(host),))
304        },
305    )?;
306    inst.func_wrap(
307        "yield-now",
308        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
309            let host = caller.data_mut();
310            ComponentControlHost::yield_now(host);
311            Ok(())
312        },
313    )?;
314    Ok(())
315}
316
317impl OAuthHostContext for ComponentState {
318    fn tenant_id(&self) -> &str {
319        &self.host.config.tenant
320    }
321
322    fn env(&self) -> &str {
323        &self.host.default_env
324    }
325
326    fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
327        &mut self.host.oauth_host
328    }
329
330    fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
331        self.host.oauth_config.as_ref()
332    }
333}
334
335impl WasiView for ComponentState {
336    fn ctx(&mut self) -> WasiCtxView<'_> {
337        WasiCtxView {
338            ctx: &mut self.wasi_ctx,
339            table: &mut self.resource_table,
340        }
341    }
342}
343
344#[allow(unsafe_code)]
345unsafe impl Send for ComponentState {}
346#[allow(unsafe_code)]
347unsafe impl Sync for ComponentState {}
348
349impl host_import_v0_6::HostImports for ComponentState {
350    fn secrets_get(
351        &mut self,
352        key: String,
353        ctx: Option<types::TenantCtx>,
354    ) -> WasmResult<Result<String, types::IfaceError>> {
355        host_import_v0_6::HostImports::secrets_get(self.host_mut(), key, ctx)
356    }
357
358    fn telemetry_emit(
359        &mut self,
360        span_json: String,
361        ctx: Option<types::TenantCtx>,
362    ) -> WasmResult<()> {
363        host_import_v0_6::HostImports::telemetry_emit(self.host_mut(), span_json, ctx)
364    }
365
366    fn http_fetch(
367        &mut self,
368        req: host_import_v0_6::http::HttpRequest,
369        ctx: Option<types::TenantCtx>,
370    ) -> WasmResult<Result<host_import_v0_6::http::HttpResponse, types::IfaceError>> {
371        host_import_v0_6::HostImports::http_fetch(self.host_mut(), req, ctx)
372    }
373
374    fn mcp_exec(
375        &mut self,
376        component: String,
377        action: String,
378        args_json: String,
379        ctx: Option<types::TenantCtx>,
380    ) -> WasmResult<Result<String, types::IfaceError>> {
381        host_import_v0_6::HostImports::mcp_exec(self.host_mut(), component, action, args_json, ctx)
382    }
383
384    fn state_get(
385        &mut self,
386        key: iface_types::StateKey,
387        ctx: Option<types::TenantCtx>,
388    ) -> WasmResult<Result<String, types::IfaceError>> {
389        host_import_v0_6::HostImports::state_get(self.host_mut(), key, ctx)
390    }
391
392    fn state_set(
393        &mut self,
394        key: iface_types::StateKey,
395        value_json: String,
396        ctx: Option<types::TenantCtx>,
397    ) -> WasmResult<Result<state::OpAck, types::IfaceError>> {
398        host_import_v0_6::HostImports::state_set(self.host_mut(), key, value_json, ctx)
399    }
400
401    fn session_update(
402        &mut self,
403        cursor: iface_types::SessionCursor,
404        ctx: Option<types::TenantCtx>,
405    ) -> WasmResult<Result<String, types::IfaceError>> {
406        host_import_v0_6::HostImports::session_update(self.host_mut(), cursor, ctx)
407    }
408}
409
410impl host_import_v0_2::HostImports for ComponentState {
411    fn secrets_get(
412        &mut self,
413        key: String,
414        ctx: Option<LegacyTenantCtx>,
415    ) -> WasmResult<Result<String, LegacyIfaceError>> {
416        host_import_v0_2::HostImports::secrets_get(self.host_mut(), key, ctx)
417    }
418
419    fn telemetry_emit(
420        &mut self,
421        span_json: String,
422        ctx: Option<LegacyTenantCtx>,
423    ) -> WasmResult<()> {
424        host_import_v0_2::HostImports::telemetry_emit(self.host_mut(), span_json, ctx)
425    }
426
427    fn tool_invoke(
428        &mut self,
429        tool: String,
430        action: String,
431        args_json: String,
432        ctx: Option<LegacyTenantCtx>,
433    ) -> WasmResult<Result<String, LegacyIfaceError>> {
434        host_import_v0_2::HostImports::tool_invoke(self.host_mut(), tool, action, args_json, ctx)
435    }
436
437    fn http_fetch(
438        &mut self,
439        req: host_import_v0_2::greentic::host_import::imports::HttpRequest,
440        ctx: Option<host_import_v0_2::greentic::host_import::imports::TenantCtx>,
441    ) -> WasmResult<
442        Result<
443            host_import_v0_2::greentic::host_import::imports::HttpResponse,
444            host_import_v0_2::greentic::host_import::imports::IfaceError,
445        >,
446    > {
447        host_import_v0_2::HostImports::http_fetch(self.host_mut(), req, ctx)
448    }
449}
450
451impl host_import_v0_6::HostImports for HostState {
452    fn secrets_get(
453        &mut self,
454        key: String,
455        _ctx: Option<types::TenantCtx>,
456    ) -> WasmResult<Result<String, types::IfaceError>> {
457        Ok(self.get_secret(&key).map_err(|err| {
458            tracing::warn!(secret = %key, error = %err, "secret lookup denied");
459            types::IfaceError::Denied
460        }))
461    }
462
463    fn telemetry_emit(
464        &mut self,
465        span_json: String,
466        _ctx: Option<types::TenantCtx>,
467    ) -> WasmResult<()> {
468        if let Some(mock) = &self.mocks
469            && mock.telemetry_drain(&[("span_json", span_json.as_str())])
470        {
471            return Ok(());
472        }
473        tracing::info!(span = %span_json, "telemetry emit from pack");
474        Ok(())
475    }
476
477    fn http_fetch(
478        &mut self,
479        req: host_import_v0_6::http::HttpRequest,
480        _ctx: Option<types::TenantCtx>,
481    ) -> WasmResult<Result<host_import_v0_6::http::HttpResponse, types::IfaceError>> {
482        let legacy_req = LegacyHttpRequest {
483            method: req.method,
484            url: req.url,
485            headers_json: req.headers_json,
486            body: req.body,
487        };
488        match host_import_v0_2::HostImports::http_fetch(self, legacy_req, None)? {
489            Ok(resp) => Ok(Ok(host_import_v0_6::http::HttpResponse {
490                status: resp.status,
491                headers_json: resp.headers_json,
492                body: resp.body,
493            })),
494            Err(err) => Ok(Err(map_legacy_error(err))),
495        }
496    }
497
498    fn mcp_exec(
499        &mut self,
500        component: String,
501        action: String,
502        args_json: String,
503        ctx: Option<types::TenantCtx>,
504    ) -> WasmResult<Result<String, types::IfaceError>> {
505        let _ = (component, action, args_json, ctx);
506        tracing::warn!("mcp.exec requested but MCP bridge is removed; returning unavailable");
507        Ok(Err(types::IfaceError::Unavailable))
508    }
509
510    fn state_get(
511        &mut self,
512        key: iface_types::StateKey,
513        ctx: Option<types::TenantCtx>,
514    ) -> WasmResult<Result<String, types::IfaceError>> {
515        let store = match self.state_store_handle() {
516            Ok(store) => store,
517            Err(err) => return Ok(Err(err)),
518        };
519        let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
520            Ok(ctx) => ctx,
521            Err(err) => {
522                tracing::warn!(error = %err, "invalid tenant context for state.get");
523                return Ok(Err(types::IfaceError::InvalidArg));
524            }
525        };
526        let key = StoreStateKey::from(key);
527        match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
528            Ok(Some(value)) => {
529                let result = if let Some(text) = value.as_str() {
530                    text.to_string()
531                } else {
532                    serde_json::to_string(&value).unwrap_or_else(|_| value.to_string())
533                };
534                Ok(Ok(result))
535            }
536            Ok(None) => Ok(Err(types::IfaceError::NotFound)),
537            Err(err) => {
538                tracing::warn!(error = %err, "state.get failed");
539                Ok(Err(types::IfaceError::Internal))
540            }
541        }
542    }
543
544    fn state_set(
545        &mut self,
546        key: iface_types::StateKey,
547        value_json: String,
548        ctx: Option<types::TenantCtx>,
549    ) -> WasmResult<Result<state::OpAck, types::IfaceError>> {
550        let store = match self.state_store_handle() {
551            Ok(store) => store,
552            Err(err) => return Ok(Err(err)),
553        };
554        let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
555            Ok(ctx) => ctx,
556            Err(err) => {
557                tracing::warn!(error = %err, "invalid tenant context for state.set");
558                return Ok(Err(types::IfaceError::InvalidArg));
559            }
560        };
561        let key = StoreStateKey::from(key);
562        let value = serde_json::from_str(&value_json).unwrap_or(Value::String(value_json));
563        match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
564            Ok(()) => Ok(Ok(state::OpAck::Ok)),
565            Err(err) => {
566                tracing::warn!(error = %err, "state.set failed");
567                Ok(Err(types::IfaceError::Internal))
568            }
569        }
570    }
571
572    fn session_update(
573        &mut self,
574        cursor: iface_types::SessionCursor,
575        ctx: Option<types::TenantCtx>,
576    ) -> WasmResult<Result<String, types::IfaceError>> {
577        let store = match self.session_store_handle() {
578            Ok(store) => store,
579            Err(err) => return Ok(Err(err)),
580        };
581        let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
582            Ok(ctx) => ctx,
583            Err(err) => {
584                tracing::warn!(error = %err, "invalid tenant context for session.update");
585                return Ok(Err(types::IfaceError::InvalidArg));
586            }
587        };
588        let user = match Self::ensure_user(&tenant_ctx) {
589            Ok(user) => user,
590            Err(err) => return Ok(Err(err)),
591        };
592        let flow_id = match Self::ensure_flow(&tenant_ctx) {
593            Ok(flow) => flow,
594            Err(err) => return Ok(Err(err)),
595        };
596        let cursor = Self::cursor_from_iface(cursor);
597        let payload = SessionData {
598            tenant_ctx: tenant_ctx.clone(),
599            flow_id,
600            cursor: cursor.clone(),
601            context_json: serde_json::json!({
602                "node_pointer": cursor.node_pointer,
603                "wait_reason": cursor.wait_reason,
604                "outbox_marker": cursor.outbox_marker,
605            })
606            .to_string(),
607        };
608        if let Some(existing) = tenant_ctx.session_id() {
609            let key = StoreSessionKey::from(existing.to_string());
610            if let Err(err) = store.update_session(&key, payload) {
611                tracing::error!(error = %err, "failed to update session snapshot");
612                return Ok(Err(types::IfaceError::Internal));
613            }
614            return Ok(Ok(existing.to_string()));
615        }
616        match store.find_by_user(&tenant_ctx, &user) {
617            Ok(Some((key, _))) => {
618                if let Err(err) = store.update_session(&key, payload) {
619                    tracing::error!(error = %err, "failed to update existing user session");
620                    return Ok(Err(types::IfaceError::Internal));
621                }
622                return Ok(Ok(key.to_string()));
623            }
624            Ok(None) => {}
625            Err(err) => {
626                tracing::error!(error = %err, "session lookup failed");
627                return Ok(Err(types::IfaceError::Internal));
628            }
629        }
630        let key = match store.create_session(&tenant_ctx, payload.clone()) {
631            Ok(key) => key,
632            Err(err) => {
633                tracing::error!(error = %err, "failed to create session");
634                return Ok(Err(types::IfaceError::Internal));
635            }
636        };
637        let ctx_with_session = tenant_ctx.with_session(key.to_string());
638        let updated_payload = SessionData {
639            tenant_ctx: ctx_with_session.clone(),
640            ..payload
641        };
642        if let Err(err) = store.update_session(&key, updated_payload) {
643            tracing::warn!(error = %err, "failed to stamp session id after create");
644        }
645        Ok(Ok(key.to_string()))
646    }
647}
648
649impl host_import_v0_2::HostImports for HostState {
650    fn secrets_get(
651        &mut self,
652        key: String,
653        _ctx: Option<LegacyTenantCtx>,
654    ) -> WasmResult<Result<String, LegacyIfaceError>> {
655        Ok(self.get_secret(&key).map_err(|err| {
656            tracing::warn!(secret = %key, error = %err, "secret lookup denied");
657            LegacyIfaceError::Denied
658        }))
659    }
660
661    fn telemetry_emit(
662        &mut self,
663        span_json: String,
664        _ctx: Option<LegacyTenantCtx>,
665    ) -> WasmResult<()> {
666        if let Some(mock) = &self.mocks
667            && mock.telemetry_drain(&[("span_json", span_json.as_str())])
668        {
669            return Ok(());
670        }
671        tracing::info!(span = %span_json, "telemetry emit from pack");
672        Ok(())
673    }
674
675    fn tool_invoke(
676        &mut self,
677        tool: String,
678        action: String,
679        args_json: String,
680        ctx: Option<LegacyTenantCtx>,
681    ) -> WasmResult<Result<String, LegacyIfaceError>> {
682        let _ = (tool, action, args_json, ctx);
683        tracing::warn!("tool invoke requested but MCP bridge is removed; returning unavailable");
684        Ok(Err(LegacyIfaceError::Unavailable))
685    }
686
687    fn http_fetch(
688        &mut self,
689        req: LegacyHttpRequest,
690        _ctx: Option<LegacyTenantCtx>,
691    ) -> WasmResult<Result<LegacyHttpResponse, LegacyIfaceError>> {
692        if !self.config.http_enabled {
693            tracing::warn!(url = %req.url, "http fetch denied by policy");
694            return Ok(Err(LegacyIfaceError::Denied));
695        }
696
697        let mut mock_state = None;
698        let raw_body = req.body.clone();
699        if let Some(mock) = &self.mocks
700            && let Ok(meta) = HttpMockRequest::new(
701                &req.method,
702                &req.url,
703                raw_body.as_deref().map(|body| body.as_bytes()),
704            )
705        {
706            match mock.http_begin(&meta) {
707                HttpDecision::Mock(response) => {
708                    let http = LegacyHttpResponse::from(&response);
709                    return Ok(Ok(http));
710                }
711                HttpDecision::Deny(reason) => {
712                    tracing::warn!(url = %req.url, reason = %reason, "http fetch blocked by mocks");
713                    return Ok(Err(LegacyIfaceError::Denied));
714                }
715                HttpDecision::Passthrough { record } => {
716                    mock_state = Some((meta, record));
717                }
718            }
719        }
720
721        let method = req.method.parse().unwrap_or(reqwest::Method::GET);
722        let mut builder = self.http_client.request(method, &req.url);
723
724        if let Some(headers_json) = req.headers_json.as_ref() {
725            match serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(headers_json) {
726                Ok(map) => {
727                    for (key, value) in map {
728                        if let Some(val) = value.as_str()
729                            && let Ok(header) =
730                                reqwest::header::HeaderName::from_bytes(key.as_bytes())
731                            && let Ok(header_value) = reqwest::header::HeaderValue::from_str(val)
732                        {
733                            builder = builder.header(header, header_value);
734                        }
735                    }
736                }
737                Err(err) => {
738                    tracing::warn!(error = %err, "failed to parse headers for http.fetch");
739                }
740            }
741        }
742
743        if let Some(body) = raw_body.clone() {
744            builder = builder.body(body);
745        }
746
747        let response = match builder.send() {
748            Ok(resp) => resp,
749            Err(err) => {
750                tracing::error!(url = %req.url, error = %err, "http fetch failed");
751                return Ok(Err(LegacyIfaceError::Unavailable));
752            }
753        };
754
755        let status = response.status().as_u16();
756        let headers_map = response
757            .headers()
758            .iter()
759            .map(|(k, v)| {
760                (
761                    k.as_str().to_string(),
762                    v.to_str().unwrap_or_default().to_string(),
763                )
764            })
765            .collect::<BTreeMap<_, _>>();
766        let headers_json = serde_json::to_string(&headers_map).ok();
767        let body = response.text().ok();
768
769        if let Some((meta, true)) = mock_state.take()
770            && let Some(mock) = &self.mocks
771        {
772            let recorded = HttpMockResponse::new(status, headers_map.clone(), body.clone());
773            mock.http_record(&meta, &recorded);
774        }
775
776        Ok(Ok(LegacyHttpResponse {
777            status,
778            headers_json,
779            body,
780        }))
781    }
782}
783
784impl PackRuntime {
785    #[allow(clippy::too_many_arguments)]
786    pub async fn load(
787        path: impl AsRef<Path>,
788        config: Arc<HostConfig>,
789        mocks: Option<Arc<MockLayer>>,
790        archive_source: Option<&Path>,
791        session_store: Option<DynSessionStore>,
792        state_store: Option<DynStateStore>,
793        wasi_policy: Arc<RunnerWasiPolicy>,
794        secrets: DynSecretsManager,
795        oauth_config: Option<OAuthBrokerConfig>,
796        verify_archive: bool,
797    ) -> Result<Self> {
798        let path = path.as_ref();
799        let (_pack_root, safe_path) = normalize_pack_path(path)?;
800        let is_component = safe_path
801            .extension()
802            .and_then(|ext| ext.to_str())
803            .map(|ext| ext.eq_ignore_ascii_case("wasm"))
804            .unwrap_or(false);
805        let archive_hint_path = if let Some(source) = archive_source {
806            let (_, normalized) = normalize_pack_path(source)?;
807            Some(normalized)
808        } else if is_component {
809            None
810        } else {
811            Some(safe_path.clone())
812        };
813        let archive_hint = archive_hint_path.as_deref();
814        if verify_archive {
815            let verify_target = archive_hint.unwrap_or(&safe_path);
816            verify::verify_pack(verify_target).await?;
817            tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
818        }
819        let engine = Engine::default();
820        let wasm_bytes = fs::read(&safe_path).await?;
821        let mut metadata = PackMetadata::from_wasm(&wasm_bytes)
822            .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
823        let mut manifest = None;
824        let flows = if let Some(archive_path) = archive_hint {
825            match open_pack(archive_path, SigningPolicy::DevOk) {
826                Ok(pack) => {
827                    metadata = PackMetadata::from_manifest(&pack.manifest);
828                    manifest = Some(pack.manifest);
829                    None
830                }
831                Err(err) => {
832                    warn!(error = ?err, pack = %archive_path.display(), "failed to parse pack manifest; falling back to legacy flow cache");
833                    match PackFlows::from_archive(archive_path) {
834                        Ok(cache) => {
835                            metadata = cache.metadata.clone();
836                            Some(cache)
837                        }
838                        Err(err) => {
839                            warn!(
840                                error = %err,
841                                pack = %archive_path.display(),
842                                "failed to parse pack flows via greentic-pack; falling back to component exports"
843                            );
844                            None
845                        }
846                    }
847                }
848            }
849        } else {
850            None
851        };
852        let components = if let Some(archive_path) = archive_hint {
853            match load_components_from_archive(&engine, archive_path) {
854                Ok(map) => map,
855                Err(err) => {
856                    warn!(error = %err, pack = %archive_path.display(), "failed to load components from archive");
857                    HashMap::new()
858                }
859            }
860        } else if is_component {
861            let name = safe_path
862                .file_stem()
863                .map(|s| s.to_string_lossy().to_string())
864                .unwrap_or_else(|| "component".to_string());
865            let component = Component::from_binary(&engine, &wasm_bytes)?;
866            let mut map = HashMap::new();
867            map.insert(
868                name.clone(),
869                PackComponent {
870                    name,
871                    version: metadata.version.clone(),
872                    component,
873                },
874            );
875            map
876        } else {
877            HashMap::new()
878        };
879        let http_client = Arc::clone(&HTTP_CLIENT);
880        Ok(Self {
881            path: safe_path,
882            archive_path: archive_hint.map(Path::to_path_buf),
883            config,
884            engine,
885            metadata,
886            manifest,
887            mocks,
888            flows,
889            components,
890            http_client,
891            pre_cache: Mutex::new(HashMap::new()),
892            session_store,
893            state_store,
894            wasi_policy,
895            secrets,
896            oauth_config,
897        })
898    }
899
900    pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
901        if let Some(cache) = &self.flows {
902            return Ok(cache.descriptors.clone());
903        }
904        if let Some(manifest) = &self.manifest {
905            let descriptors = manifest
906                .flows
907                .iter()
908                .map(|flow| FlowDescriptor {
909                    id: flow.id.clone(),
910                    flow_type: flow.kind.clone(),
911                    profile: manifest.meta.name.clone(),
912                    version: manifest.meta.version.to_string(),
913                    description: Some(flow.kind.clone()),
914                })
915                .collect();
916            return Ok(descriptors);
917        }
918        Ok(Vec::new())
919    }
920
921    #[allow(dead_code)]
922    pub async fn run_flow(
923        &self,
924        _flow_id: &str,
925        _input: serde_json::Value,
926    ) -> Result<serde_json::Value> {
927        // TODO: dispatch flow execution via Wasmtime
928        Ok(serde_json::json!({}))
929    }
930
931    pub async fn invoke_component(
932        &self,
933        component_ref: &str,
934        ctx: ComponentExecCtx,
935        operation: &str,
936        _config_json: Option<String>,
937        input_json: String,
938    ) -> Result<Value> {
939        let pack_component = self
940            .components
941            .get(component_ref)
942            .with_context(|| format!("component '{component_ref}' not found in pack"))?;
943
944        let pre = if let Some(pre) = self.pre_cache.lock().get(component_ref).cloned() {
945            pre
946        } else {
947            let mut linker = Linker::new(&self.engine);
948            register_all(&mut linker, self.oauth_config.is_some())?;
949            add_component_control_to_linker(&mut linker)?;
950            let pre = ComponentPre::new(
951                linker
952                    .instantiate_pre(&pack_component.component)
953                    .map_err(|err| anyhow!(err))?,
954            )
955            .map_err(|err| anyhow!(err))?;
956            self.pre_cache
957                .lock()
958                .insert(component_ref.to_string(), pre.clone());
959            pre
960        };
961
962        let host_state = HostState::new(
963            Arc::clone(&self.config),
964            Arc::clone(&self.http_client),
965            self.mocks.clone(),
966            self.session_store.clone(),
967            self.state_store.clone(),
968            Arc::clone(&self.secrets),
969            self.oauth_config.clone(),
970        )?;
971        let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
972        let mut store = wasmtime::Store::new(&self.engine, store_state);
973        let bindings = pre
974            .instantiate_async(&mut store)
975            .await
976            .map_err(|err| anyhow!(err))?;
977        let node = bindings.greentic_component_node();
978
979        let result = node.call_invoke(&mut store, &ctx, operation, &input_json)?;
980
981        match result {
982            InvokeResult::Ok(body) => {
983                if body.is_empty() {
984                    return Ok(Value::Null);
985                }
986                serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
987            }
988            InvokeResult::Err(NodeError {
989                code,
990                message,
991                retryable,
992                backoff_ms,
993                details,
994            }) => {
995                let mut obj = serde_json::Map::new();
996                obj.insert("ok".into(), Value::Bool(false));
997                let mut error = serde_json::Map::new();
998                error.insert("code".into(), Value::String(code));
999                error.insert("message".into(), Value::String(message));
1000                error.insert("retryable".into(), Value::Bool(retryable));
1001                if let Some(backoff) = backoff_ms {
1002                    error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1003                }
1004                if let Some(details) = details {
1005                    error.insert(
1006                        "details".into(),
1007                        serde_json::from_str(&details).unwrap_or(Value::String(details)),
1008                    );
1009                }
1010                obj.insert("error".into(), Value::Object(error));
1011                Ok(Value::Object(obj))
1012            }
1013        }
1014    }
1015
1016    pub fn load_flow_ir(&self, flow_id: &str) -> Result<greentic_flow::ir::FlowIR> {
1017        if let Some(cache) = &self.flows {
1018            return cache
1019                .flows
1020                .get(flow_id)
1021                .cloned()
1022                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1023        }
1024        if let Some(manifest) = &self.manifest {
1025            let entry = manifest
1026                .flows
1027                .iter()
1028                .find(|f| f.id == flow_id)
1029                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1030            let archive_path = self.archive_path.as_ref().unwrap_or(&self.path);
1031            let mut archive = ZipArchive::new(File::open(archive_path)?)?;
1032            match load_flow_doc(&mut archive, entry).and_then(|doc| {
1033                greentic_flow::to_ir(doc.clone())
1034                    .with_context(|| format!("failed to build IR for flow {}", doc.id))
1035            }) {
1036                Ok(ir) => return Ok(ir),
1037                Err(err) => {
1038                    warn!(
1039                        error = %err,
1040                        flow_id = flow_id,
1041                        "failed to parse flow from archive; falling back to stub IR"
1042                    );
1043                    return Ok(build_stub_flow_ir(&entry.id, &entry.kind));
1044                }
1045            }
1046        }
1047        bail!("flow '{flow_id}' not available (pack exports disabled)")
1048    }
1049
1050    pub fn metadata(&self) -> &PackMetadata {
1051        &self.metadata
1052    }
1053
1054    pub fn for_component_test(
1055        components: Vec<(String, PathBuf)>,
1056        flows: HashMap<String, FlowIR>,
1057        config: Arc<HostConfig>,
1058    ) -> Result<Self> {
1059        let engine = Engine::default();
1060        let mut component_map = HashMap::new();
1061        for (name, path) in components {
1062            if !path.exists() {
1063                bail!("component artifact missing: {}", path.display());
1064            }
1065            let wasm_bytes = std::fs::read(&path)?;
1066            let component = Component::from_binary(&engine, &wasm_bytes)
1067                .with_context(|| format!("failed to compile component {}", path.display()))?;
1068            component_map.insert(
1069                name.clone(),
1070                PackComponent {
1071                    name,
1072                    version: "0.0.0".into(),
1073                    component,
1074                },
1075            );
1076        }
1077
1078        let descriptors = flows
1079            .iter()
1080            .map(|(id, ir)| FlowDescriptor {
1081                id: id.clone(),
1082                flow_type: ir.flow_type.clone(),
1083                profile: "test".into(),
1084                version: "0.0.0".into(),
1085                description: None,
1086            })
1087            .collect::<Vec<_>>();
1088        let flows_cache = PackFlows {
1089            descriptors: descriptors.clone(),
1090            flows,
1091            metadata: PackMetadata::fallback(Path::new("component-test")),
1092        };
1093
1094        Ok(Self {
1095            path: PathBuf::new(),
1096            archive_path: None,
1097            config,
1098            engine,
1099            metadata: PackMetadata::fallback(Path::new("component-test")),
1100            manifest: None,
1101            mocks: None,
1102            flows: Some(flows_cache),
1103            components: component_map,
1104            http_client: Arc::clone(&HTTP_CLIENT),
1105            pre_cache: Mutex::new(HashMap::new()),
1106            session_store: None,
1107            state_store: None,
1108            wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1109            secrets: crate::secrets::default_manager(),
1110            oauth_config: None,
1111        })
1112    }
1113}
1114
1115fn map_legacy_error(err: LegacyIfaceError) -> types::IfaceError {
1116    match err {
1117        LegacyIfaceError::InvalidArg => types::IfaceError::InvalidArg,
1118        LegacyIfaceError::NotFound => types::IfaceError::NotFound,
1119        LegacyIfaceError::Denied => types::IfaceError::Denied,
1120        LegacyIfaceError::Unavailable => types::IfaceError::Unavailable,
1121        LegacyIfaceError::Internal => types::IfaceError::Internal,
1122    }
1123}
1124
1125struct PackFlows {
1126    descriptors: Vec<FlowDescriptor>,
1127    flows: HashMap<String, FlowIR>,
1128    metadata: PackMetadata,
1129}
1130
1131impl PackFlows {
1132    fn from_archive(path: &Path) -> Result<Self> {
1133        let pack = open_pack(path, SigningPolicy::DevOk)
1134            .map_err(|err| anyhow!("failed to open pack {}: {}", path.display(), err.message))?;
1135        let file = File::open(path)
1136            .with_context(|| format!("failed to open archive {}", path.display()))?;
1137        let mut archive = ZipArchive::new(file)
1138            .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1139        let mut flows = HashMap::new();
1140        let mut descriptors = Vec::new();
1141        for flow in &pack.manifest.flows {
1142            match load_flow_entry(&mut archive, flow, &pack.manifest.meta) {
1143                Ok((descriptor, ir)) => {
1144                    flows.insert(descriptor.id.clone(), ir);
1145                    descriptors.push(descriptor);
1146                }
1147                Err(err) => {
1148                    warn!(
1149                        error = %err,
1150                        flow_id = flow.id,
1151                        "failed to parse flow metadata from archive; using stub flow"
1152                    );
1153                    let stub = build_stub_flow_ir(&flow.id, &flow.kind);
1154                    flows.insert(flow.id.clone(), stub.clone());
1155                    descriptors.push(FlowDescriptor {
1156                        id: flow.id.clone(),
1157                        flow_type: flow.kind.clone(),
1158                        profile: pack.manifest.meta.name.clone(),
1159                        version: pack.manifest.meta.version.to_string(),
1160                        description: Some(flow.kind.clone()),
1161                    });
1162                }
1163            }
1164        }
1165
1166        Ok(Self {
1167            metadata: PackMetadata::from_manifest(&pack.manifest),
1168            descriptors,
1169            flows,
1170        })
1171    }
1172}
1173
1174fn load_flow_entry(
1175    archive: &mut ZipArchive<File>,
1176    entry: &greentic_pack::builder::FlowEntry,
1177    meta: &greentic_pack::builder::PackMeta,
1178) -> Result<(FlowDescriptor, FlowIR)> {
1179    let doc = load_flow_doc(archive, entry)?;
1180    let ir = greentic_flow::to_ir(doc.clone())
1181        .with_context(|| format!("failed to build IR for flow {}", doc.id))?;
1182    let descriptor = FlowDescriptor {
1183        id: doc.id.clone(),
1184        flow_type: doc.flow_type.clone(),
1185        profile: meta.name.clone(),
1186        version: meta.version.to_string(),
1187        description: doc
1188            .description
1189            .clone()
1190            .or(doc.title.clone())
1191            .or_else(|| Some(entry.kind.clone())),
1192    };
1193    Ok((descriptor, ir))
1194}
1195
1196fn load_flow_doc(
1197    archive: &mut ZipArchive<File>,
1198    entry: &greentic_pack::builder::FlowEntry,
1199) -> Result<greentic_flow::model::FlowDoc> {
1200    if let Ok(bytes) = read_entry(archive, &entry.file_yaml)
1201        && let Ok(doc) = serde_yaml::from_slice(&bytes)
1202    {
1203        return Ok(doc);
1204    }
1205    let json_bytes = read_entry(archive, &entry.file_json)
1206        .with_context(|| format!("missing flow document {}", entry.file_json))?;
1207    let doc = serde_json::from_slice(&json_bytes)
1208        .with_context(|| format!("failed to parse {}", entry.file_json))?;
1209    Ok(doc)
1210}
1211
1212fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1213    let mut file = archive
1214        .by_name(name)
1215        .with_context(|| format!("entry {name} missing from archive"))?;
1216    let mut buf = Vec::new();
1217    file.read_to_end(&mut buf)?;
1218    Ok(buf)
1219}
1220
1221fn load_components_from_archive(
1222    engine: &Engine,
1223    path: &Path,
1224) -> Result<HashMap<String, PackComponent>> {
1225    let pack = open_pack(path, SigningPolicy::DevOk)
1226        .map_err(|err| anyhow!("failed to open pack {}: {}", path.display(), err.message))?;
1227    let mut archive = ZipArchive::new(File::open(path)?)?;
1228    let mut components = HashMap::new();
1229    for entry in &pack.manifest.components {
1230        let bytes = read_entry(&mut archive, &entry.file_wasm)
1231            .with_context(|| format!("missing component {}", entry.file_wasm))?;
1232        let component = Component::from_binary(engine, &bytes)
1233            .with_context(|| format!("failed to compile component {}", entry.name))?;
1234        components.insert(
1235            entry.name.clone(),
1236            PackComponent {
1237                name: entry.name.clone(),
1238                version: entry.version.to_string(),
1239                component,
1240            },
1241        );
1242    }
1243    Ok(components)
1244}
1245
1246#[derive(Clone, Debug, Default, Serialize, Deserialize)]
1247pub struct PackMetadata {
1248    pub pack_id: String,
1249    pub version: String,
1250    #[serde(default)]
1251    pub entry_flows: Vec<String>,
1252}
1253
1254impl PackMetadata {
1255    fn from_wasm(bytes: &[u8]) -> Option<Self> {
1256        let parser = Parser::new(0);
1257        for payload in parser.parse_all(bytes) {
1258            let payload = payload.ok()?;
1259            match payload {
1260                Payload::CustomSection(section) => {
1261                    if section.name() == "greentic.manifest"
1262                        && let Ok(meta) = Self::from_bytes(section.data())
1263                    {
1264                        return Some(meta);
1265                    }
1266                }
1267                Payload::DataSection(reader) => {
1268                    for segment in reader.into_iter().flatten() {
1269                        if let Ok(meta) = Self::from_bytes(segment.data) {
1270                            return Some(meta);
1271                        }
1272                    }
1273                }
1274                _ => {}
1275            }
1276        }
1277        None
1278    }
1279
1280    fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
1281        #[derive(Deserialize)]
1282        struct RawManifest {
1283            pack_id: String,
1284            version: String,
1285            #[serde(default)]
1286            entry_flows: Vec<String>,
1287            #[serde(default)]
1288            flows: Vec<RawFlow>,
1289        }
1290
1291        #[derive(Deserialize)]
1292        struct RawFlow {
1293            id: String,
1294        }
1295
1296        let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
1297        let mut entry_flows = if manifest.entry_flows.is_empty() {
1298            manifest.flows.iter().map(|f| f.id.clone()).collect()
1299        } else {
1300            manifest.entry_flows.clone()
1301        };
1302        entry_flows.retain(|id| !id.is_empty());
1303        Ok(Self {
1304            pack_id: manifest.pack_id,
1305            version: manifest.version,
1306            entry_flows,
1307        })
1308    }
1309
1310    pub fn fallback(path: &Path) -> Self {
1311        let pack_id = path
1312            .file_stem()
1313            .map(|s| s.to_string_lossy().into_owned())
1314            .unwrap_or_else(|| "unknown-pack".to_string());
1315        Self {
1316            pack_id,
1317            version: "0.0.0".to_string(),
1318            entry_flows: Vec::new(),
1319        }
1320    }
1321
1322    pub fn from_manifest(manifest: &greentic_pack::builder::PackManifest) -> Self {
1323        let entry_flows = if manifest.meta.entry_flows.is_empty() {
1324            manifest
1325                .flows
1326                .iter()
1327                .map(|flow| flow.id.clone())
1328                .collect::<Vec<_>>()
1329        } else {
1330            manifest.meta.entry_flows.clone()
1331        };
1332        Self {
1333            pack_id: manifest.meta.pack_id.clone(),
1334            version: manifest.meta.version.to_string(),
1335            entry_flows,
1336        }
1337    }
1338}
1339
1340fn build_stub_flow_ir(flow_id: &str, flow_type: &str) -> FlowIR {
1341    let mut nodes = IndexMap::new();
1342    nodes.insert(
1343        "complete".into(),
1344        NodeIR {
1345            component: "component.exec".into(),
1346            payload_expr: serde_json::json!({
1347                "component": "qa.process",
1348                "operation": "process",
1349                "input": {
1350                    "status": "done",
1351                    "flow_id": flow_id,
1352                }
1353            }),
1354            routes: vec![RouteIR {
1355                to: None,
1356                out: true,
1357            }],
1358        },
1359    );
1360    FlowIR {
1361        id: flow_id.to_string(),
1362        flow_type: flow_type.to_string(),
1363        start: Some("complete".into()),
1364        parameters: Value::Object(Default::default()),
1365        nodes,
1366    }
1367}
1368
1369impl From<&HttpMockResponse> for LegacyHttpResponse {
1370    fn from(value: &HttpMockResponse) -> Self {
1371        let headers_json = serde_json::to_string(&value.headers).ok();
1372        Self {
1373            status: value.status,
1374            headers_json,
1375            body: value.body.clone(),
1376        }
1377    }
1378}