1use std::collections::{BTreeMap, HashMap};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7
8use crate::runtime_wasmtime::{Component, Engine, Linker, ResourceTable, Store, WasmResult};
9use anyhow::{Context, Result, anyhow, bail};
10use greentic_flow::ir::{FlowIR, NodeIR, RouteIR};
11use greentic_interfaces::host_import_v0_2;
12use greentic_interfaces::host_import_v0_2::greentic::host_import::imports::{
13 HttpRequest as LegacyHttpRequest, HttpResponse as LegacyHttpResponse,
14 IfaceError as LegacyIfaceError, TenantCtx as LegacyTenantCtx,
15};
16use greentic_interfaces::host_import_v0_6::{self, iface_types, state, types};
17use greentic_interfaces::pack_export_v0_2;
18use greentic_interfaces::pack_export_v0_2::exports::greentic::pack_export::exports::FlowInfo;
19#[cfg(feature = "mcp")]
20use greentic_mcp::{ExecConfig, ExecError, ExecRequest};
21use greentic_pack::reader::{SigningPolicy, open_pack};
22use greentic_session::SessionKey as StoreSessionKey;
23use greentic_types::{
24 EnvId, FlowId, SessionCursor as StoreSessionCursor, SessionData, StateKey as StoreStateKey,
25 TeamId, TenantCtx as TypesTenantCtx, TenantId, UserId,
26};
27use indexmap::IndexMap;
28use reqwest::blocking::Client as BlockingClient;
29use serde::{Deserialize, Serialize};
30use serde_cbor;
31use serde_json::{self, Value};
32use serde_yaml_bw as serde_yaml;
33use tokio::fs;
34use wasmparser::{Parser, Payload};
35use zip::ZipArchive;
36
37use crate::imports;
38use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
39
40use crate::config::HostConfig;
41use crate::secrets::{DynSecretsManager, read_secret_blocking};
42use crate::storage::state::STATE_PREFIX;
43use crate::storage::{DynSessionStore, DynStateStore};
44use crate::verify;
45use crate::wasi::RunnerWasiPolicy;
46use tracing::warn;
47use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
48
49pub struct PackRuntime {
50 path: PathBuf,
51 config: Arc<HostConfig>,
52 engine: Engine,
53 component: Option<Component>,
54 metadata: PackMetadata,
55 mocks: Option<Arc<MockLayer>>,
56 flows: Option<PackFlows>,
57 session_store: Option<DynSessionStore>,
58 state_store: Option<DynStateStore>,
59 wasi_policy: Arc<RunnerWasiPolicy>,
60 secrets: DynSecretsManager,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct FlowDescriptor {
65 pub id: String,
66 #[serde(rename = "type")]
67 pub flow_type: String,
68 pub profile: String,
69 pub version: String,
70 #[serde(default)]
71 pub description: Option<String>,
72}
73
74pub struct HostState {
75 config: Arc<HostConfig>,
76 http_client: BlockingClient,
77 #[cfg(feature = "mcp")]
78 exec_config: Option<ExecConfig>,
79 default_env: String,
80 session_store: Option<DynSessionStore>,
81 state_store: Option<DynStateStore>,
82 mocks: Option<Arc<MockLayer>>,
83 secrets: DynSecretsManager,
84}
85
86impl HostState {
87 pub fn new(
88 config: Arc<HostConfig>,
89 mocks: Option<Arc<MockLayer>>,
90 session_store: Option<DynSessionStore>,
91 state_store: Option<DynStateStore>,
92 secrets: DynSecretsManager,
93 ) -> Result<Self> {
94 let http_client = BlockingClient::builder().build()?;
95 #[cfg(feature = "mcp")]
96 let exec_config = config.mcp_exec_config().ok();
97 let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
98 Ok(Self {
99 config,
100 http_client,
101 #[cfg(feature = "mcp")]
102 exec_config,
103 default_env,
104 session_store,
105 state_store,
106 mocks,
107 secrets,
108 })
109 }
110
111 pub fn get_secret(&self, key: &str) -> Result<String> {
112 if !self.config.secrets_policy.is_allowed(key) {
113 bail!("secret {key} is not permitted by bindings policy");
114 }
115 if let Some(mock) = &self.mocks
116 && let Some(value) = mock.secrets_lookup(key)
117 {
118 return Ok(value);
119 }
120 let bytes = read_secret_blocking(&self.secrets, key)
121 .context("failed to read secret from manager")?;
122 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
123 Ok(value)
124 }
125
126 fn tenant_ctx_from_v6(&self, ctx: Option<types::TenantCtx>) -> Result<TypesTenantCtx> {
127 let tenant_raw = ctx
128 .as_ref()
129 .map(|ctx| ctx.tenant.clone())
130 .unwrap_or_else(|| self.config.tenant.clone());
131 let tenant_id = TenantId::from_str(&tenant_raw)
132 .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
133 let env_id = EnvId::from_str(&self.default_env)
134 .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
135 let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
136 if let Some(ctx) = ctx {
137 if let Some(team) = ctx.team {
138 let team_id =
139 TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
140 tenant_ctx = tenant_ctx.with_team(Some(team_id));
141 }
142 if let Some(user) = ctx.user {
143 let user_id =
144 UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
145 tenant_ctx = tenant_ctx.with_user(Some(user_id));
146 }
147 if let Some(flow) = ctx.flow_id {
148 tenant_ctx = tenant_ctx.with_flow(flow);
149 }
150 if let Some(node) = ctx.node_id {
151 tenant_ctx = tenant_ctx.with_node(node);
152 }
153 if let Some(provider) = ctx.provider_id {
154 tenant_ctx = tenant_ctx.with_provider(provider);
155 }
156 if let Some(session) = ctx.session_id {
157 tenant_ctx = tenant_ctx.with_session(session);
158 }
159 tenant_ctx.trace_id = ctx.trace_id;
160 }
161 Ok(tenant_ctx)
162 }
163
164 fn session_store_handle(&self) -> Result<DynSessionStore, types::IfaceError> {
165 self.session_store
166 .as_ref()
167 .cloned()
168 .ok_or(types::IfaceError::Unavailable)
169 }
170
171 fn state_store_handle(&self) -> Result<DynStateStore, types::IfaceError> {
172 self.state_store
173 .as_ref()
174 .cloned()
175 .ok_or(types::IfaceError::Unavailable)
176 }
177
178 fn ensure_user(ctx: &TypesTenantCtx) -> Result<UserId, types::IfaceError> {
179 ctx.user_id
180 .clone()
181 .or_else(|| ctx.user.clone())
182 .ok_or(types::IfaceError::InvalidArg)
183 }
184
185 fn ensure_flow(ctx: &TypesTenantCtx) -> Result<FlowId, types::IfaceError> {
186 let flow = ctx.flow_id().ok_or(types::IfaceError::InvalidArg)?;
187 FlowId::from_str(flow).map_err(|_| types::IfaceError::InvalidArg)
188 }
189
190 fn cursor_from_iface(cursor: iface_types::SessionCursor) -> StoreSessionCursor {
191 let mut store_cursor = StoreSessionCursor::new(cursor.node_pointer);
192 if let Some(reason) = cursor.wait_reason {
193 store_cursor = store_cursor.with_wait_reason(reason);
194 }
195 if let Some(marker) = cursor.outbox_marker {
196 store_cursor = store_cursor.with_outbox_marker(marker);
197 }
198 store_cursor
199 }
200}
201
202pub struct ComponentState {
203 host: HostState,
204 wasi_ctx: WasiCtx,
205 resource_table: ResourceTable,
206}
207
208impl ComponentState {
209 pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
210 let wasi_ctx = policy
211 .instantiate()
212 .context("failed to build WASI context")?;
213 Ok(Self {
214 host,
215 wasi_ctx,
216 resource_table: ResourceTable::new(),
217 })
218 }
219
220 fn host_mut(&mut self) -> &mut HostState {
221 &mut self.host
222 }
223}
224
225impl WasiView for ComponentState {
226 fn ctx(&mut self) -> WasiCtxView<'_> {
227 WasiCtxView {
228 ctx: &mut self.wasi_ctx,
229 table: &mut self.resource_table,
230 }
231 }
232}
233
234#[allow(unsafe_code)]
235unsafe impl Send for ComponentState {}
236#[allow(unsafe_code)]
237unsafe impl Sync for ComponentState {}
238
239impl host_import_v0_6::HostImports for ComponentState {
240 fn secrets_get(
241 &mut self,
242 key: String,
243 ctx: Option<types::TenantCtx>,
244 ) -> WasmResult<Result<String, types::IfaceError>> {
245 host_import_v0_6::HostImports::secrets_get(self.host_mut(), key, ctx)
246 }
247
248 fn telemetry_emit(
249 &mut self,
250 span_json: String,
251 ctx: Option<types::TenantCtx>,
252 ) -> WasmResult<()> {
253 host_import_v0_6::HostImports::telemetry_emit(self.host_mut(), span_json, ctx)
254 }
255
256 fn http_fetch(
257 &mut self,
258 req: host_import_v0_6::http::HttpRequest,
259 ctx: Option<types::TenantCtx>,
260 ) -> WasmResult<Result<host_import_v0_6::http::HttpResponse, types::IfaceError>> {
261 host_import_v0_6::HostImports::http_fetch(self.host_mut(), req, ctx)
262 }
263
264 fn mcp_exec(
265 &mut self,
266 component: String,
267 action: String,
268 args_json: String,
269 ctx: Option<types::TenantCtx>,
270 ) -> WasmResult<Result<String, types::IfaceError>> {
271 host_import_v0_6::HostImports::mcp_exec(self.host_mut(), component, action, args_json, ctx)
272 }
273
274 fn state_get(
275 &mut self,
276 key: iface_types::StateKey,
277 ctx: Option<types::TenantCtx>,
278 ) -> WasmResult<Result<String, types::IfaceError>> {
279 host_import_v0_6::HostImports::state_get(self.host_mut(), key, ctx)
280 }
281
282 fn state_set(
283 &mut self,
284 key: iface_types::StateKey,
285 value_json: String,
286 ctx: Option<types::TenantCtx>,
287 ) -> WasmResult<Result<state::OpAck, types::IfaceError>> {
288 host_import_v0_6::HostImports::state_set(self.host_mut(), key, value_json, ctx)
289 }
290
291 fn session_update(
292 &mut self,
293 cursor: iface_types::SessionCursor,
294 ctx: Option<types::TenantCtx>,
295 ) -> WasmResult<Result<String, types::IfaceError>> {
296 host_import_v0_6::HostImports::session_update(self.host_mut(), cursor, ctx)
297 }
298}
299
300impl host_import_v0_2::HostImports for ComponentState {
301 fn secrets_get(
302 &mut self,
303 key: String,
304 ctx: Option<LegacyTenantCtx>,
305 ) -> WasmResult<Result<String, LegacyIfaceError>> {
306 host_import_v0_2::HostImports::secrets_get(self.host_mut(), key, ctx)
307 }
308
309 fn telemetry_emit(
310 &mut self,
311 span_json: String,
312 ctx: Option<LegacyTenantCtx>,
313 ) -> WasmResult<()> {
314 host_import_v0_2::HostImports::telemetry_emit(self.host_mut(), span_json, ctx)
315 }
316
317 fn tool_invoke(
318 &mut self,
319 tool: String,
320 action: String,
321 args_json: String,
322 ctx: Option<LegacyTenantCtx>,
323 ) -> WasmResult<Result<String, LegacyIfaceError>> {
324 host_import_v0_2::HostImports::tool_invoke(self.host_mut(), tool, action, args_json, ctx)
325 }
326
327 fn http_fetch(
328 &mut self,
329 req: host_import_v0_2::greentic::host_import::imports::HttpRequest,
330 ctx: Option<host_import_v0_2::greentic::host_import::imports::TenantCtx>,
331 ) -> WasmResult<
332 Result<
333 host_import_v0_2::greentic::host_import::imports::HttpResponse,
334 host_import_v0_2::greentic::host_import::imports::IfaceError,
335 >,
336 > {
337 host_import_v0_2::HostImports::http_fetch(self.host_mut(), req, ctx)
338 }
339}
340
341impl host_import_v0_6::HostImports for HostState {
342 fn secrets_get(
343 &mut self,
344 key: String,
345 _ctx: Option<types::TenantCtx>,
346 ) -> WasmResult<Result<String, types::IfaceError>> {
347 Ok(self.get_secret(&key).map_err(|err| {
348 tracing::warn!(secret = %key, error = %err, "secret lookup denied");
349 types::IfaceError::Denied
350 }))
351 }
352
353 fn telemetry_emit(
354 &mut self,
355 span_json: String,
356 _ctx: Option<types::TenantCtx>,
357 ) -> WasmResult<()> {
358 if let Some(mock) = &self.mocks
359 && mock.telemetry_drain(&[("span_json", span_json.as_str())])
360 {
361 return Ok(());
362 }
363 tracing::info!(span = %span_json, "telemetry emit from pack");
364 Ok(())
365 }
366
367 fn http_fetch(
368 &mut self,
369 req: host_import_v0_6::http::HttpRequest,
370 _ctx: Option<types::TenantCtx>,
371 ) -> WasmResult<Result<host_import_v0_6::http::HttpResponse, types::IfaceError>> {
372 let legacy_req = LegacyHttpRequest {
373 method: req.method,
374 url: req.url,
375 headers_json: req.headers_json,
376 body: req.body,
377 };
378 match greentic_interfaces::host_import_v0_2::HostImports::http_fetch(
379 self, legacy_req, None,
380 )? {
381 Ok(resp) => Ok(Ok(host_import_v0_6::http::HttpResponse {
382 status: resp.status,
383 headers_json: resp.headers_json,
384 body: resp.body,
385 })),
386 Err(err) => Ok(Err(map_legacy_error(err))),
387 }
388 }
389
390 fn mcp_exec(
391 &mut self,
392 component: String,
393 action: String,
394 args_json: String,
395 ctx: Option<types::TenantCtx>,
396 ) -> WasmResult<Result<String, types::IfaceError>> {
397 #[cfg(not(feature = "mcp"))]
398 {
399 let _ = (component, action, args_json, ctx);
400 tracing::warn!("mcp.exec requested but crate built without `mcp` feature");
401 return Ok(Err(types::IfaceError::Unavailable));
402 }
403 #[cfg(feature = "mcp")]
404 {
405 let exec_config = match &self.exec_config {
406 Some(cfg) => cfg.clone(),
407 None => {
408 tracing::warn!(component = %component, action = %action, "exec config unavailable for tool invoke");
409 return Ok(Err(types::IfaceError::Unavailable));
410 }
411 };
412 let args: Value = match serde_json::from_str(&args_json) {
413 Ok(value) => value,
414 Err(err) => {
415 tracing::warn!(error = %err, "invalid args for mcp.exec node");
416 return Ok(Err(types::IfaceError::InvalidArg));
417 }
418 };
419 let tenant = match self.tenant_ctx_from_v6(ctx) {
420 Ok(ctx) => Some(ctx),
421 Err(err) => {
422 tracing::warn!(error = %err, "failed to parse tenant context for mcp.exec");
423 None
424 }
425 };
426 let request = ExecRequest {
427 component: component.clone(),
428 action: action.clone(),
429 args,
430 tenant,
431 };
432 match greentic_mcp::exec(request, &exec_config) {
433 Ok(value) => match serde_json::to_string(&value) {
434 Ok(body) => Ok(Ok(body)),
435 Err(err) => {
436 tracing::error!(error = %err, "failed to serialise tool result");
437 Ok(Err(types::IfaceError::Internal))
438 }
439 },
440 Err(err) => {
441 tracing::warn!(component = %component, action = %action, error = %err, "mcp exec failed");
442 let iface_err = match err {
443 ExecError::NotFound { .. } => types::IfaceError::NotFound,
444 ExecError::Tool { .. } => types::IfaceError::Denied,
445 _ => types::IfaceError::Unavailable,
446 };
447 Ok(Err(iface_err))
448 }
449 }
450 }
451 }
452
453 fn state_get(
454 &mut self,
455 key: iface_types::StateKey,
456 ctx: Option<types::TenantCtx>,
457 ) -> WasmResult<Result<String, types::IfaceError>> {
458 let store = match self.state_store_handle() {
459 Ok(store) => store,
460 Err(err) => return Ok(Err(err)),
461 };
462 let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
463 Ok(ctx) => ctx,
464 Err(err) => {
465 tracing::warn!(error = %err, "invalid tenant context for state.get");
466 return Ok(Err(types::IfaceError::InvalidArg));
467 }
468 };
469 let key = StoreStateKey::from(key);
470 match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
471 Ok(Some(value)) => {
472 let result = if let Some(text) = value.as_str() {
473 text.to_string()
474 } else {
475 serde_json::to_string(&value).unwrap_or_else(|_| value.to_string())
476 };
477 Ok(Ok(result))
478 }
479 Ok(None) => Ok(Err(types::IfaceError::NotFound)),
480 Err(err) => {
481 tracing::warn!(error = %err, "state.get failed");
482 Ok(Err(types::IfaceError::Internal))
483 }
484 }
485 }
486
487 fn state_set(
488 &mut self,
489 key: iface_types::StateKey,
490 value_json: String,
491 ctx: Option<types::TenantCtx>,
492 ) -> WasmResult<Result<state::OpAck, types::IfaceError>> {
493 let store = match self.state_store_handle() {
494 Ok(store) => store,
495 Err(err) => return Ok(Err(err)),
496 };
497 let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
498 Ok(ctx) => ctx,
499 Err(err) => {
500 tracing::warn!(error = %err, "invalid tenant context for state.set");
501 return Ok(Err(types::IfaceError::InvalidArg));
502 }
503 };
504 let key = StoreStateKey::from(key);
505 let value = serde_json::from_str(&value_json).unwrap_or(Value::String(value_json));
506 match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
507 Ok(()) => Ok(Ok(state::OpAck::Ok)),
508 Err(err) => {
509 tracing::warn!(error = %err, "state.set failed");
510 Ok(Err(types::IfaceError::Internal))
511 }
512 }
513 }
514
515 fn session_update(
516 &mut self,
517 cursor: iface_types::SessionCursor,
518 ctx: Option<types::TenantCtx>,
519 ) -> WasmResult<Result<String, types::IfaceError>> {
520 let store = match self.session_store_handle() {
521 Ok(store) => store,
522 Err(err) => return Ok(Err(err)),
523 };
524 let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
525 Ok(ctx) => ctx,
526 Err(err) => {
527 tracing::warn!(error = %err, "invalid tenant context for session.update");
528 return Ok(Err(types::IfaceError::InvalidArg));
529 }
530 };
531 let user = match Self::ensure_user(&tenant_ctx) {
532 Ok(user) => user,
533 Err(err) => return Ok(Err(err)),
534 };
535 let flow_id = match Self::ensure_flow(&tenant_ctx) {
536 Ok(flow) => flow,
537 Err(err) => return Ok(Err(err)),
538 };
539 let cursor = Self::cursor_from_iface(cursor);
540 let payload = SessionData {
541 tenant_ctx: tenant_ctx.clone(),
542 flow_id,
543 cursor: cursor.clone(),
544 context_json: serde_json::json!({
545 "node_pointer": cursor.node_pointer,
546 "wait_reason": cursor.wait_reason,
547 "outbox_marker": cursor.outbox_marker,
548 })
549 .to_string(),
550 };
551 if let Some(existing) = tenant_ctx.session_id() {
552 let key = StoreSessionKey::from(existing.to_string());
553 if let Err(err) = store.update_session(&key, payload) {
554 tracing::error!(error = %err, "failed to update session snapshot");
555 return Ok(Err(types::IfaceError::Internal));
556 }
557 return Ok(Ok(existing.to_string()));
558 }
559 match store.find_by_user(&tenant_ctx, &user) {
560 Ok(Some((key, _))) => {
561 if let Err(err) = store.update_session(&key, payload) {
562 tracing::error!(error = %err, "failed to update existing user session");
563 return Ok(Err(types::IfaceError::Internal));
564 }
565 return Ok(Ok(key.to_string()));
566 }
567 Ok(None) => {}
568 Err(err) => {
569 tracing::error!(error = %err, "session lookup failed");
570 return Ok(Err(types::IfaceError::Internal));
571 }
572 }
573 let key = match store.create_session(&tenant_ctx, payload.clone()) {
574 Ok(key) => key,
575 Err(err) => {
576 tracing::error!(error = %err, "failed to create session");
577 return Ok(Err(types::IfaceError::Internal));
578 }
579 };
580 let ctx_with_session = tenant_ctx.with_session(key.to_string());
581 let updated_payload = SessionData {
582 tenant_ctx: ctx_with_session.clone(),
583 ..payload
584 };
585 if let Err(err) = store.update_session(&key, updated_payload) {
586 tracing::warn!(error = %err, "failed to stamp session id after create");
587 }
588 Ok(Ok(key.to_string()))
589 }
590}
591
592impl greentic_interfaces::host_import_v0_2::HostImports for HostState {
593 fn secrets_get(
594 &mut self,
595 key: String,
596 _ctx: Option<LegacyTenantCtx>,
597 ) -> WasmResult<Result<String, LegacyIfaceError>> {
598 Ok(self.get_secret(&key).map_err(|err| {
599 tracing::warn!(secret = %key, error = %err, "secret lookup denied");
600 LegacyIfaceError::Denied
601 }))
602 }
603
604 fn telemetry_emit(
605 &mut self,
606 span_json: String,
607 _ctx: Option<LegacyTenantCtx>,
608 ) -> WasmResult<()> {
609 if let Some(mock) = &self.mocks
610 && mock.telemetry_drain(&[("span_json", span_json.as_str())])
611 {
612 return Ok(());
613 }
614 tracing::info!(span = %span_json, "telemetry emit from pack");
615 Ok(())
616 }
617
618 fn tool_invoke(
619 &mut self,
620 tool: String,
621 action: String,
622 args_json: String,
623 ctx: Option<LegacyTenantCtx>,
624 ) -> WasmResult<Result<String, LegacyIfaceError>> {
625 #[cfg(not(feature = "mcp"))]
626 {
627 let _ = (tool, action, args_json, ctx);
628 tracing::warn!("tool invoke requested but crate built without `mcp` feature");
629 return Ok(Err(LegacyIfaceError::Unavailable));
630 }
631
632 #[cfg(feature = "mcp")]
633 {
634 let exec_config = match &self.exec_config {
635 Some(cfg) => cfg.clone(),
636 None => {
637 tracing::warn!(%tool, %action, "exec config unavailable for tool invoke");
638 return Ok(Err(LegacyIfaceError::Unavailable));
639 }
640 };
641
642 let args: Value = match serde_json::from_str(&args_json) {
643 Ok(value) => value,
644 Err(err) => {
645 tracing::warn!(error = %err, "invalid args for tool invoke");
646 return Ok(Err(LegacyIfaceError::InvalidArg));
647 }
648 };
649
650 let tenant = ctx.map(|ctx| map_legacy_tenant_ctx(ctx, &self.default_env));
651
652 let request = ExecRequest {
653 component: tool.clone(),
654 action: action.clone(),
655 args,
656 tenant,
657 };
658
659 if let Some(mock) = &self.mocks
660 && let Some(result) = mock.tool_short_circuit(&tool, &action)
661 {
662 return match result.and_then(|value| {
663 serde_json::to_string(&value)
664 .map_err(|err| anyhow!("failed to serialise mock tool output: {err}"))
665 }) {
666 Ok(body) => Ok(Ok(body)),
667 Err(err) => {
668 tracing::error!(error = %err, "mock tool execution failed");
669 Ok(Err(LegacyIfaceError::Internal))
670 }
671 };
672 }
673
674 match greentic_mcp::exec(request, &exec_config) {
675 Ok(value) => match serde_json::to_string(&value) {
676 Ok(body) => Ok(Ok(body)),
677 Err(err) => {
678 tracing::error!(error = %err, "failed to serialise tool result");
679 Ok(Err(LegacyIfaceError::Internal))
680 }
681 },
682 Err(err) => {
683 tracing::warn!(%tool, %action, error = %err, "tool invoke failed");
684 let iface_err = match err {
685 ExecError::NotFound { .. } => LegacyIfaceError::NotFound,
686 ExecError::Tool { .. } => LegacyIfaceError::Denied,
687 _ => LegacyIfaceError::Unavailable,
688 };
689 Ok(Err(iface_err))
690 }
691 }
692 }
693 }
694
695 fn http_fetch(
696 &mut self,
697 req: LegacyHttpRequest,
698 _ctx: Option<LegacyTenantCtx>,
699 ) -> WasmResult<Result<LegacyHttpResponse, LegacyIfaceError>> {
700 if !self.config.http_enabled {
701 tracing::warn!(url = %req.url, "http fetch denied by policy");
702 return Ok(Err(LegacyIfaceError::Denied));
703 }
704
705 let mut mock_state = None;
706 let raw_body = req.body.clone();
707 if let Some(mock) = &self.mocks
708 && let Ok(meta) = HttpMockRequest::new(
709 &req.method,
710 &req.url,
711 raw_body.as_deref().map(|body| body.as_bytes()),
712 )
713 {
714 match mock.http_begin(&meta) {
715 HttpDecision::Mock(response) => {
716 let http = LegacyHttpResponse::from(&response);
717 return Ok(Ok(http));
718 }
719 HttpDecision::Deny(reason) => {
720 tracing::warn!(url = %req.url, reason = %reason, "http fetch blocked by mocks");
721 return Ok(Err(LegacyIfaceError::Denied));
722 }
723 HttpDecision::Passthrough { record } => {
724 mock_state = Some((meta, record));
725 }
726 }
727 }
728
729 let method = req.method.parse().unwrap_or(reqwest::Method::GET);
730 let mut builder = self.http_client.request(method, &req.url);
731
732 if let Some(headers_json) = req.headers_json.as_ref() {
733 match serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(headers_json) {
734 Ok(map) => {
735 for (key, value) in map {
736 if let Some(val) = value.as_str()
737 && let Ok(header) =
738 reqwest::header::HeaderName::from_bytes(key.as_bytes())
739 && let Ok(header_value) = reqwest::header::HeaderValue::from_str(val)
740 {
741 builder = builder.header(header, header_value);
742 }
743 }
744 }
745 Err(err) => {
746 tracing::warn!(error = %err, "failed to parse headers for http.fetch");
747 }
748 }
749 }
750
751 if let Some(body) = raw_body.clone() {
752 builder = builder.body(body);
753 }
754
755 let response = match builder.send() {
756 Ok(resp) => resp,
757 Err(err) => {
758 tracing::error!(url = %req.url, error = %err, "http fetch failed");
759 return Ok(Err(LegacyIfaceError::Unavailable));
760 }
761 };
762
763 let status = response.status().as_u16();
764 let headers_map = response
765 .headers()
766 .iter()
767 .map(|(k, v)| {
768 (
769 k.as_str().to_string(),
770 v.to_str().unwrap_or_default().to_string(),
771 )
772 })
773 .collect::<BTreeMap<_, _>>();
774 let headers_json = serde_json::to_string(&headers_map).ok();
775 let body = response.text().ok();
776
777 if let Some((meta, true)) = mock_state.take()
778 && let Some(mock) = &self.mocks
779 {
780 let recorded = HttpMockResponse::new(status, headers_map.clone(), body.clone());
781 mock.http_record(&meta, &recorded);
782 }
783
784 Ok(Ok(LegacyHttpResponse {
785 status,
786 headers_json,
787 body,
788 }))
789 }
790}
791
792impl PackRuntime {
793 #[allow(clippy::too_many_arguments)]
794 pub async fn load(
795 path: impl AsRef<Path>,
796 config: Arc<HostConfig>,
797 mocks: Option<Arc<MockLayer>>,
798 archive_source: Option<&Path>,
799 session_store: Option<DynSessionStore>,
800 state_store: Option<DynStateStore>,
801 wasi_policy: Arc<RunnerWasiPolicy>,
802 secrets: DynSecretsManager,
803 verify_archive: bool,
804 ) -> Result<Self> {
805 let path = path.as_ref();
806 let is_component = path
807 .extension()
808 .and_then(|ext| ext.to_str())
809 .map(|ext| ext.eq_ignore_ascii_case("wasm"))
810 .unwrap_or(false);
811 let archive_hint = archive_source.or(if is_component { None } else { Some(path) });
812 if verify_archive {
813 let verify_target = archive_hint.unwrap_or(path);
814 verify::verify_pack(verify_target).await?;
815 tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
816 }
817 let engine = Engine::default();
818 let wasm_bytes = fs::read(path).await?;
819 let mut metadata =
820 PackMetadata::from_wasm(&wasm_bytes).unwrap_or_else(|| PackMetadata::fallback(path));
821 let mut flows = if let Some(archive_path) = archive_hint {
822 match PackFlows::from_archive(archive_path) {
823 Ok(cache) => {
824 metadata = cache.metadata.clone();
825 Some(cache)
826 }
827 Err(err) => {
828 warn!(
829 error = %err,
830 pack = %archive_path.display(),
831 "failed to parse pack flows via greentic-pack; falling back to component exports"
832 );
833 None
834 }
835 }
836 } else {
837 None
838 };
839 let component = match Component::from_file(&engine, path) {
840 Ok(component) => Some(component),
841 Err(err) => {
842 if let Some(archive_path) = archive_hint {
843 if flows.is_none() {
844 let archive_data =
845 PackFlows::from_archive(archive_path).with_context(|| {
846 format!(
847 "failed to build flow cache from {}",
848 archive_path.display()
849 )
850 })?;
851 metadata = archive_data.metadata.clone();
852 flows = Some(archive_data);
853 }
854 warn!(
855 error = %err,
856 pack = %archive_path.display(),
857 "component load failed; continuing with cached flow metadata"
858 );
859 None
860 } else {
861 return Err(err);
862 }
863 }
864 };
865 Ok(Self {
866 path: path.to_path_buf(),
867 config,
868 engine,
869 component,
870 metadata,
871 mocks,
872 flows,
873 session_store,
874 state_store,
875 wasi_policy,
876 secrets,
877 })
878 }
879
880 pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
881 if let Some(cache) = &self.flows {
882 return Ok(cache.descriptors.clone());
883 }
884 tracing::trace!(
885 tenant = %self.config.tenant,
886 pack_path = %self.path.display(),
887 "listing flows from pack"
888 );
889 let component = self
890 .component
891 .as_ref()
892 .ok_or_else(|| anyhow!("pack component unavailable"))?;
893 let host_state = HostState::new(
894 Arc::clone(&self.config),
895 self.mocks.clone(),
896 self.session_store.clone(),
897 self.state_store.clone(),
898 Arc::clone(&self.secrets),
899 )?;
900 let mut store = Store::new(
901 &self.engine,
902 ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?,
903 );
904 let mut linker = Linker::new(&self.engine);
905 imports::register_all(&mut linker)?;
906 let bindings = pack_export_v0_2::PackExports::instantiate(&mut store, component, &linker)?;
907 let exports = bindings.greentic_pack_export_exports();
908 let flows_raw = match exports.call_list_flows(&mut store)? {
909 Ok(flows) => flows,
910 Err(err) => {
911 bail!("pack list_flows failed: {err:?}");
912 }
913 };
914 let flows = flows_raw
915 .into_iter()
916 .map(|flow: FlowInfo| {
917 let profile = flow.profile.clone();
918 let version = flow.version.clone();
919 FlowDescriptor {
920 id: flow.id,
921 flow_type: flow.flow_type,
922 profile,
923 version,
924 description: Some(format!("{}@{}", flow.profile, flow.version)),
925 }
926 })
927 .collect();
928 Ok(flows)
929 }
930
931 #[allow(dead_code)]
932 pub async fn run_flow(
933 &self,
934 _flow_id: &str,
935 _input: serde_json::Value,
936 ) -> Result<serde_json::Value> {
937 Ok(serde_json::json!({}))
939 }
940
941 pub fn load_flow_ir(&self, flow_id: &str) -> Result<greentic_flow::ir::FlowIR> {
942 if let Some(cache) = &self.flows {
943 return cache
944 .flows
945 .get(flow_id)
946 .cloned()
947 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
948 }
949 let component = self
950 .component
951 .as_ref()
952 .ok_or_else(|| anyhow!("pack component unavailable"))?;
953 let host_state = HostState::new(
954 Arc::clone(&self.config),
955 self.mocks.clone(),
956 self.session_store.clone(),
957 self.state_store.clone(),
958 Arc::clone(&self.secrets),
959 )?;
960 let mut store = Store::new(
961 &self.engine,
962 ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?,
963 );
964 let mut linker = Linker::new(&self.engine);
965 imports::register_all(&mut linker)?;
966 let bindings = pack_export_v0_2::PackExports::instantiate(&mut store, component, &linker)?;
967 let exports = bindings.greentic_pack_export_exports();
968 let flow_name = flow_id.to_string();
969 let metadata = match exports.call_flow_metadata(&mut store, &flow_name)? {
970 Ok(doc) => doc,
971 Err(err) => bail!("pack flow_metadata({flow_id}) failed: {err:?}"),
972 };
973 let flow_doc: greentic_flow::model::FlowDoc = serde_json::from_str(&metadata)
974 .or_else(|_| serde_yaml::from_str(&metadata))
975 .with_context(|| format!("failed to parse flow metadata for {flow_id}"))?;
976 let ir = greentic_flow::to_ir(flow_doc)?;
977 Ok(ir)
978 }
979
980 pub fn metadata(&self) -> &PackMetadata {
981 &self.metadata
982 }
983}
984
985#[cfg(feature = "mcp")]
986fn map_legacy_tenant_ctx(ctx: LegacyTenantCtx, default_env: &str) -> TypesTenantCtx {
987 let env = ctx
988 .deployment
989 .runtime
990 .unwrap_or_else(|| default_env.to_string());
991
992 let env_id = EnvId::from_str(env.as_str()).expect("invalid env id");
993 let tenant_id = TenantId::from_str(ctx.tenant.as_str()).expect("invalid tenant id");
994 let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
995 tenant_ctx = tenant_ctx.with_team(
996 ctx.team
997 .as_ref()
998 .and_then(|team| TeamId::from_str(team.as_str()).ok()),
999 );
1000 tenant_ctx = tenant_ctx.with_user(
1001 ctx.user
1002 .as_ref()
1003 .and_then(|user| UserId::from_str(user.as_str()).ok()),
1004 );
1005 tenant_ctx.trace_id = ctx.trace_id;
1006 tenant_ctx
1007}
1008
1009fn map_legacy_error(err: LegacyIfaceError) -> types::IfaceError {
1010 match err {
1011 LegacyIfaceError::InvalidArg => types::IfaceError::InvalidArg,
1012 LegacyIfaceError::NotFound => types::IfaceError::NotFound,
1013 LegacyIfaceError::Denied => types::IfaceError::Denied,
1014 LegacyIfaceError::Unavailable => types::IfaceError::Unavailable,
1015 LegacyIfaceError::Internal => types::IfaceError::Internal,
1016 }
1017}
1018
1019struct PackFlows {
1020 descriptors: Vec<FlowDescriptor>,
1021 flows: HashMap<String, FlowIR>,
1022 metadata: PackMetadata,
1023}
1024
1025impl PackFlows {
1026 fn from_archive(path: &Path) -> Result<Self> {
1027 let pack = open_pack(path, SigningPolicy::DevOk)
1028 .map_err(|err| anyhow!("failed to open pack {}: {}", path.display(), err.message))?;
1029 let file = File::open(path)
1030 .with_context(|| format!("failed to open archive {}", path.display()))?;
1031 let mut archive = ZipArchive::new(file)
1032 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1033 let mut flows = HashMap::new();
1034 let mut descriptors = Vec::new();
1035 for flow in &pack.manifest.flows {
1036 match load_flow_entry(&mut archive, flow, &pack.manifest.meta) {
1037 Ok((descriptor, ir)) => {
1038 flows.insert(descriptor.id.clone(), ir);
1039 descriptors.push(descriptor);
1040 }
1041 Err(err) => {
1042 warn!(
1043 error = %err,
1044 flow_id = flow.id,
1045 "failed to parse flow metadata from archive; using stub flow"
1046 );
1047 let stub = build_stub_flow_ir(&flow.id, &flow.kind);
1048 flows.insert(flow.id.clone(), stub.clone());
1049 descriptors.push(FlowDescriptor {
1050 id: flow.id.clone(),
1051 flow_type: flow.kind.clone(),
1052 profile: pack.manifest.meta.name.clone(),
1053 version: pack.manifest.meta.version.to_string(),
1054 description: Some(flow.kind.clone()),
1055 });
1056 }
1057 }
1058 }
1059
1060 Ok(Self {
1061 metadata: PackMetadata::from_manifest(&pack.manifest),
1062 descriptors,
1063 flows,
1064 })
1065 }
1066}
1067
1068fn load_flow_entry(
1069 archive: &mut ZipArchive<File>,
1070 entry: &greentic_pack::builder::FlowEntry,
1071 meta: &greentic_pack::builder::PackMeta,
1072) -> Result<(FlowDescriptor, FlowIR)> {
1073 let doc = load_flow_doc(archive, entry)?;
1074 let ir = greentic_flow::to_ir(doc.clone())
1075 .with_context(|| format!("failed to build IR for flow {}", doc.id))?;
1076 let descriptor = FlowDescriptor {
1077 id: doc.id.clone(),
1078 flow_type: doc.flow_type.clone(),
1079 profile: meta.name.clone(),
1080 version: meta.version.to_string(),
1081 description: doc
1082 .description
1083 .clone()
1084 .or(doc.title.clone())
1085 .or_else(|| Some(entry.kind.clone())),
1086 };
1087 Ok((descriptor, ir))
1088}
1089
1090fn load_flow_doc(
1091 archive: &mut ZipArchive<File>,
1092 entry: &greentic_pack::builder::FlowEntry,
1093) -> Result<greentic_flow::model::FlowDoc> {
1094 if let Ok(bytes) = read_entry(archive, &entry.file_yaml)
1095 && let Ok(doc) = serde_yaml::from_slice(&bytes)
1096 {
1097 return Ok(doc);
1098 }
1099 let json_bytes = read_entry(archive, &entry.file_json)
1100 .with_context(|| format!("missing flow document {}", entry.file_json))?;
1101 let doc = serde_json::from_slice(&json_bytes)
1102 .with_context(|| format!("failed to parse {}", entry.file_json))?;
1103 Ok(doc)
1104}
1105
1106fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1107 let mut file = archive
1108 .by_name(name)
1109 .with_context(|| format!("entry {name} missing from archive"))?;
1110 let mut buf = Vec::new();
1111 file.read_to_end(&mut buf)?;
1112 Ok(buf)
1113}
1114
1115#[derive(Clone, Debug, Default, Serialize, Deserialize)]
1116pub struct PackMetadata {
1117 pub pack_id: String,
1118 pub version: String,
1119 #[serde(default)]
1120 pub entry_flows: Vec<String>,
1121}
1122
1123impl PackMetadata {
1124 fn from_wasm(bytes: &[u8]) -> Option<Self> {
1125 let parser = Parser::new(0);
1126 for payload in parser.parse_all(bytes) {
1127 let payload = payload.ok()?;
1128 match payload {
1129 Payload::CustomSection(section) => {
1130 if section.name() == "greentic.manifest"
1131 && let Ok(meta) = Self::from_bytes(section.data())
1132 {
1133 return Some(meta);
1134 }
1135 }
1136 Payload::DataSection(reader) => {
1137 for segment in reader.into_iter().flatten() {
1138 if let Ok(meta) = Self::from_bytes(segment.data) {
1139 return Some(meta);
1140 }
1141 }
1142 }
1143 _ => {}
1144 }
1145 }
1146 None
1147 }
1148
1149 fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
1150 #[derive(Deserialize)]
1151 struct RawManifest {
1152 pack_id: String,
1153 version: String,
1154 #[serde(default)]
1155 entry_flows: Vec<String>,
1156 #[serde(default)]
1157 flows: Vec<RawFlow>,
1158 }
1159
1160 #[derive(Deserialize)]
1161 struct RawFlow {
1162 id: String,
1163 }
1164
1165 let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
1166 let mut entry_flows = if manifest.entry_flows.is_empty() {
1167 manifest.flows.iter().map(|f| f.id.clone()).collect()
1168 } else {
1169 manifest.entry_flows.clone()
1170 };
1171 entry_flows.retain(|id| !id.is_empty());
1172 Ok(Self {
1173 pack_id: manifest.pack_id,
1174 version: manifest.version,
1175 entry_flows,
1176 })
1177 }
1178
1179 pub fn fallback(path: &Path) -> Self {
1180 let pack_id = path
1181 .file_stem()
1182 .map(|s| s.to_string_lossy().into_owned())
1183 .unwrap_or_else(|| "unknown-pack".to_string());
1184 Self {
1185 pack_id,
1186 version: "0.0.0".to_string(),
1187 entry_flows: Vec::new(),
1188 }
1189 }
1190
1191 pub fn from_manifest(manifest: &greentic_pack::builder::PackManifest) -> Self {
1192 let entry_flows = if manifest.meta.entry_flows.is_empty() {
1193 manifest
1194 .flows
1195 .iter()
1196 .map(|flow| flow.id.clone())
1197 .collect::<Vec<_>>()
1198 } else {
1199 manifest.meta.entry_flows.clone()
1200 };
1201 Self {
1202 pack_id: manifest.meta.pack_id.clone(),
1203 version: manifest.meta.version.to_string(),
1204 entry_flows,
1205 }
1206 }
1207}
1208
1209fn build_stub_flow_ir(flow_id: &str, flow_type: &str) -> FlowIR {
1210 let mut nodes = IndexMap::new();
1211 nodes.insert(
1212 "complete".into(),
1213 NodeIR {
1214 component: "qa.process".into(),
1215 payload_expr: serde_json::json!({
1216 "status": "done",
1217 "flow_id": flow_id,
1218 }),
1219 routes: vec![RouteIR {
1220 to: None,
1221 out: true,
1222 }],
1223 },
1224 );
1225 FlowIR {
1226 id: flow_id.to_string(),
1227 flow_type: flow_type.to_string(),
1228 start: Some("complete".into()),
1229 parameters: Value::Object(Default::default()),
1230 nodes,
1231 }
1232}
1233
1234impl From<&HttpMockResponse> for LegacyHttpResponse {
1235 fn from(value: &HttpMockResponse) -> Self {
1236 let headers_json = serde_json::to_string(&value.headers).ok();
1237 Self {
1238 status: value.status,
1239 headers_json,
1240 body: value.body.clone(),
1241 }
1242 }
1243}