1use std::collections::{BTreeMap, HashMap};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7
8use crate::runtime_wasmtime::{Component, Engine, Linker, ResourceTable, Store, WasmResult};
9use anyhow::{Context, Result, anyhow, bail};
10use greentic_flow::ir::{FlowIR, NodeIR, RouteIR};
11use greentic_interfaces::host_import_v0_2;
12use greentic_interfaces::host_import_v0_2::greentic::host_import::imports::{
13 HttpRequest as LegacyHttpRequest, HttpResponse as LegacyHttpResponse,
14 IfaceError as LegacyIfaceError, TenantCtx as LegacyTenantCtx,
15};
16use greentic_interfaces::host_import_v0_6::{self, iface_types, state, types};
17use greentic_interfaces::pack_export_v0_2;
18use greentic_interfaces::pack_export_v0_2::exports::greentic::pack_export::exports::FlowInfo;
19#[cfg(feature = "mcp")]
20use greentic_mcp::{ExecConfig, ExecError, ExecRequest};
21use greentic_session::SessionKey as StoreSessionKey;
22use greentic_types::{
23 EnvId, FlowId, SessionCursor as StoreSessionCursor, SessionData, StateKey as StoreStateKey,
24 TeamId, TenantCtx as TypesTenantCtx, TenantId, UserId,
25};
26use indexmap::IndexMap;
27use reqwest::blocking::Client as BlockingClient;
28use serde::{Deserialize, Serialize};
29use serde_cbor;
30use serde_json::{self, Value, json};
31use serde_yaml_bw as serde_yaml;
32use tokio::fs;
33use wasmparser::{Parser, Payload};
34use zip::ZipArchive;
35
36use crate::imports;
37use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
38
39use crate::config::HostConfig;
40use crate::storage::state::STATE_PREFIX;
41use crate::storage::{DynSessionStore, DynStateStore};
42use crate::verify;
43use crate::wasi::RunnerWasiPolicy;
44use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
45
46pub struct PackRuntime {
47 path: PathBuf,
48 config: Arc<HostConfig>,
49 engine: Engine,
50 component: Option<Component>,
51 metadata: PackMetadata,
52 mocks: Option<Arc<MockLayer>>,
53 archive: Option<ArchiveFlows>,
54 session_store: Option<DynSessionStore>,
55 state_store: Option<DynStateStore>,
56 wasi_policy: Arc<RunnerWasiPolicy>,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct FlowDescriptor {
61 pub id: String,
62 #[serde(rename = "type")]
63 pub flow_type: String,
64 pub profile: String,
65 pub version: String,
66 #[serde(default)]
67 pub description: Option<String>,
68}
69
70pub struct HostState {
71 config: Arc<HostConfig>,
72 http_client: BlockingClient,
73 #[cfg(feature = "mcp")]
74 exec_config: Option<ExecConfig>,
75 default_env: String,
76 session_store: Option<DynSessionStore>,
77 state_store: Option<DynStateStore>,
78 mocks: Option<Arc<MockLayer>>,
79}
80
81impl HostState {
82 pub fn new(
83 config: Arc<HostConfig>,
84 mocks: Option<Arc<MockLayer>>,
85 session_store: Option<DynSessionStore>,
86 state_store: Option<DynStateStore>,
87 ) -> Result<Self> {
88 let http_client = BlockingClient::builder().build()?;
89 #[cfg(feature = "mcp")]
90 let exec_config = config.mcp_exec_config().ok();
91 let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
92 Ok(Self {
93 config,
94 http_client,
95 #[cfg(feature = "mcp")]
96 exec_config,
97 default_env,
98 session_store,
99 state_store,
100 mocks,
101 })
102 }
103
104 pub fn get_secret(&self, key: &str) -> Result<String> {
105 if !self.config.secrets_policy.is_allowed(key) {
106 bail!("secret {key} is not permitted by bindings policy");
107 }
108 if let Some(mock) = &self.mocks
109 && let Some(value) = mock.secrets_lookup(key)
110 {
111 return Ok(value);
112 }
113 if let Ok(value) = std::env::var(key) {
114 return Ok(value);
115 }
116 bail!("secret {key} not found in environment");
117 }
118
119 fn tenant_ctx_from_v6(&self, ctx: Option<types::TenantCtx>) -> Result<TypesTenantCtx> {
120 let tenant_raw = ctx
121 .as_ref()
122 .map(|ctx| ctx.tenant.clone())
123 .unwrap_or_else(|| self.config.tenant.clone());
124 let tenant_id = TenantId::from_str(&tenant_raw)
125 .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
126 let env_id = EnvId::from_str(&self.default_env)
127 .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
128 let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
129 if let Some(ctx) = ctx {
130 if let Some(team) = ctx.team {
131 let team_id =
132 TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
133 tenant_ctx = tenant_ctx.with_team(Some(team_id));
134 }
135 if let Some(user) = ctx.user {
136 let user_id =
137 UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
138 tenant_ctx = tenant_ctx.with_user(Some(user_id));
139 }
140 if let Some(flow) = ctx.flow_id {
141 tenant_ctx = tenant_ctx.with_flow(flow);
142 }
143 if let Some(node) = ctx.node_id {
144 tenant_ctx = tenant_ctx.with_node(node);
145 }
146 if let Some(provider) = ctx.provider_id {
147 tenant_ctx = tenant_ctx.with_provider(provider);
148 }
149 if let Some(session) = ctx.session_id {
150 tenant_ctx = tenant_ctx.with_session(session);
151 }
152 tenant_ctx.trace_id = ctx.trace_id;
153 }
154 Ok(tenant_ctx)
155 }
156
157 fn session_store_handle(&self) -> Result<DynSessionStore, types::IfaceError> {
158 self.session_store
159 .as_ref()
160 .cloned()
161 .ok_or(types::IfaceError::Unavailable)
162 }
163
164 fn state_store_handle(&self) -> Result<DynStateStore, types::IfaceError> {
165 self.state_store
166 .as_ref()
167 .cloned()
168 .ok_or(types::IfaceError::Unavailable)
169 }
170
171 fn ensure_user(ctx: &TypesTenantCtx) -> Result<UserId, types::IfaceError> {
172 ctx.user_id
173 .clone()
174 .or_else(|| ctx.user.clone())
175 .ok_or(types::IfaceError::InvalidArg)
176 }
177
178 fn ensure_flow(ctx: &TypesTenantCtx) -> Result<FlowId, types::IfaceError> {
179 let flow = ctx.flow_id().ok_or(types::IfaceError::InvalidArg)?;
180 FlowId::from_str(flow).map_err(|_| types::IfaceError::InvalidArg)
181 }
182
183 fn cursor_from_iface(cursor: iface_types::SessionCursor) -> StoreSessionCursor {
184 let mut store_cursor = StoreSessionCursor::new(cursor.node_pointer);
185 if let Some(reason) = cursor.wait_reason {
186 store_cursor = store_cursor.with_wait_reason(reason);
187 }
188 if let Some(marker) = cursor.outbox_marker {
189 store_cursor = store_cursor.with_outbox_marker(marker);
190 }
191 store_cursor
192 }
193}
194
195pub struct ComponentState {
196 host: HostState,
197 wasi_ctx: WasiCtx,
198 resource_table: ResourceTable,
199}
200
201impl ComponentState {
202 pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
203 let wasi_ctx = policy
204 .instantiate()
205 .context("failed to build WASI context")?;
206 Ok(Self {
207 host,
208 wasi_ctx,
209 resource_table: ResourceTable::new(),
210 })
211 }
212
213 fn host_mut(&mut self) -> &mut HostState {
214 &mut self.host
215 }
216}
217
218impl WasiView for ComponentState {
219 fn ctx(&mut self) -> WasiCtxView<'_> {
220 WasiCtxView {
221 ctx: &mut self.wasi_ctx,
222 table: &mut self.resource_table,
223 }
224 }
225}
226
227#[allow(unsafe_code)]
228unsafe impl Send for ComponentState {}
229#[allow(unsafe_code)]
230unsafe impl Sync for ComponentState {}
231
232impl host_import_v0_6::HostImports for ComponentState {
233 fn secrets_get(
234 &mut self,
235 key: String,
236 ctx: Option<types::TenantCtx>,
237 ) -> WasmResult<Result<String, types::IfaceError>> {
238 host_import_v0_6::HostImports::secrets_get(self.host_mut(), key, ctx)
239 }
240
241 fn telemetry_emit(
242 &mut self,
243 span_json: String,
244 ctx: Option<types::TenantCtx>,
245 ) -> WasmResult<()> {
246 host_import_v0_6::HostImports::telemetry_emit(self.host_mut(), span_json, ctx)
247 }
248
249 fn http_fetch(
250 &mut self,
251 req: host_import_v0_6::http::HttpRequest,
252 ctx: Option<types::TenantCtx>,
253 ) -> WasmResult<Result<host_import_v0_6::http::HttpResponse, types::IfaceError>> {
254 host_import_v0_6::HostImports::http_fetch(self.host_mut(), req, ctx)
255 }
256
257 fn mcp_exec(
258 &mut self,
259 component: String,
260 action: String,
261 args_json: String,
262 ctx: Option<types::TenantCtx>,
263 ) -> WasmResult<Result<String, types::IfaceError>> {
264 host_import_v0_6::HostImports::mcp_exec(self.host_mut(), component, action, args_json, ctx)
265 }
266
267 fn state_get(
268 &mut self,
269 key: iface_types::StateKey,
270 ctx: Option<types::TenantCtx>,
271 ) -> WasmResult<Result<String, types::IfaceError>> {
272 host_import_v0_6::HostImports::state_get(self.host_mut(), key, ctx)
273 }
274
275 fn state_set(
276 &mut self,
277 key: iface_types::StateKey,
278 value_json: String,
279 ctx: Option<types::TenantCtx>,
280 ) -> WasmResult<Result<state::OpAck, types::IfaceError>> {
281 host_import_v0_6::HostImports::state_set(self.host_mut(), key, value_json, ctx)
282 }
283
284 fn session_update(
285 &mut self,
286 cursor: iface_types::SessionCursor,
287 ctx: Option<types::TenantCtx>,
288 ) -> WasmResult<Result<String, types::IfaceError>> {
289 host_import_v0_6::HostImports::session_update(self.host_mut(), cursor, ctx)
290 }
291}
292
293impl host_import_v0_2::HostImports for ComponentState {
294 fn secrets_get(
295 &mut self,
296 key: String,
297 ctx: Option<LegacyTenantCtx>,
298 ) -> WasmResult<Result<String, LegacyIfaceError>> {
299 host_import_v0_2::HostImports::secrets_get(self.host_mut(), key, ctx)
300 }
301
302 fn telemetry_emit(
303 &mut self,
304 span_json: String,
305 ctx: Option<LegacyTenantCtx>,
306 ) -> WasmResult<()> {
307 host_import_v0_2::HostImports::telemetry_emit(self.host_mut(), span_json, ctx)
308 }
309
310 fn tool_invoke(
311 &mut self,
312 tool: String,
313 action: String,
314 args_json: String,
315 ctx: Option<LegacyTenantCtx>,
316 ) -> WasmResult<Result<String, LegacyIfaceError>> {
317 host_import_v0_2::HostImports::tool_invoke(self.host_mut(), tool, action, args_json, ctx)
318 }
319
320 fn http_fetch(
321 &mut self,
322 req: host_import_v0_2::greentic::host_import::imports::HttpRequest,
323 ctx: Option<host_import_v0_2::greentic::host_import::imports::TenantCtx>,
324 ) -> WasmResult<
325 Result<
326 host_import_v0_2::greentic::host_import::imports::HttpResponse,
327 host_import_v0_2::greentic::host_import::imports::IfaceError,
328 >,
329 > {
330 host_import_v0_2::HostImports::http_fetch(self.host_mut(), req, ctx)
331 }
332}
333
334impl host_import_v0_6::HostImports for HostState {
335 fn secrets_get(
336 &mut self,
337 key: String,
338 _ctx: Option<types::TenantCtx>,
339 ) -> WasmResult<Result<String, types::IfaceError>> {
340 Ok(self.get_secret(&key).map_err(|err| {
341 tracing::warn!(secret = %key, error = %err, "secret lookup denied");
342 types::IfaceError::Denied
343 }))
344 }
345
346 fn telemetry_emit(
347 &mut self,
348 span_json: String,
349 _ctx: Option<types::TenantCtx>,
350 ) -> WasmResult<()> {
351 if let Some(mock) = &self.mocks
352 && mock.telemetry_drain(&[("span_json", span_json.as_str())])
353 {
354 return Ok(());
355 }
356 tracing::info!(span = %span_json, "telemetry emit from pack");
357 Ok(())
358 }
359
360 fn http_fetch(
361 &mut self,
362 req: host_import_v0_6::http::HttpRequest,
363 _ctx: Option<types::TenantCtx>,
364 ) -> WasmResult<Result<host_import_v0_6::http::HttpResponse, types::IfaceError>> {
365 let legacy_req = LegacyHttpRequest {
366 method: req.method,
367 url: req.url,
368 headers_json: req.headers_json,
369 body: req.body,
370 };
371 match greentic_interfaces::host_import_v0_2::HostImports::http_fetch(
372 self, legacy_req, None,
373 )? {
374 Ok(resp) => Ok(Ok(host_import_v0_6::http::HttpResponse {
375 status: resp.status,
376 headers_json: resp.headers_json,
377 body: resp.body,
378 })),
379 Err(err) => Ok(Err(map_legacy_error(err))),
380 }
381 }
382
383 fn mcp_exec(
384 &mut self,
385 component: String,
386 action: String,
387 args_json: String,
388 ctx: Option<types::TenantCtx>,
389 ) -> WasmResult<Result<String, types::IfaceError>> {
390 #[cfg(not(feature = "mcp"))]
391 {
392 let _ = (component, action, args_json, ctx);
393 tracing::warn!("mcp.exec requested but crate built without `mcp` feature");
394 return Ok(Err(types::IfaceError::Unavailable));
395 }
396 #[cfg(feature = "mcp")]
397 {
398 let exec_config = match &self.exec_config {
399 Some(cfg) => cfg.clone(),
400 None => {
401 tracing::warn!(component = %component, action = %action, "exec config unavailable for tool invoke");
402 return Ok(Err(types::IfaceError::Unavailable));
403 }
404 };
405 let args: Value = match serde_json::from_str(&args_json) {
406 Ok(value) => value,
407 Err(err) => {
408 tracing::warn!(error = %err, "invalid args for mcp.exec node");
409 return Ok(Err(types::IfaceError::InvalidArg));
410 }
411 };
412 let tenant = match self.tenant_ctx_from_v6(ctx) {
413 Ok(ctx) => Some(ctx),
414 Err(err) => {
415 tracing::warn!(error = %err, "failed to parse tenant context for mcp.exec");
416 None
417 }
418 };
419 let request = ExecRequest {
420 component: component.clone(),
421 action: action.clone(),
422 args,
423 tenant,
424 };
425 match greentic_mcp::exec(request, &exec_config) {
426 Ok(value) => match serde_json::to_string(&value) {
427 Ok(body) => Ok(Ok(body)),
428 Err(err) => {
429 tracing::error!(error = %err, "failed to serialise tool result");
430 Ok(Err(types::IfaceError::Internal))
431 }
432 },
433 Err(err) => {
434 tracing::warn!(component = %component, action = %action, error = %err, "mcp exec failed");
435 let iface_err = match err {
436 ExecError::NotFound { .. } => types::IfaceError::NotFound,
437 ExecError::Tool { .. } => types::IfaceError::Denied,
438 _ => types::IfaceError::Unavailable,
439 };
440 Ok(Err(iface_err))
441 }
442 }
443 }
444 }
445
446 fn state_get(
447 &mut self,
448 key: iface_types::StateKey,
449 ctx: Option<types::TenantCtx>,
450 ) -> WasmResult<Result<String, types::IfaceError>> {
451 let store = match self.state_store_handle() {
452 Ok(store) => store,
453 Err(err) => return Ok(Err(err)),
454 };
455 let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
456 Ok(ctx) => ctx,
457 Err(err) => {
458 tracing::warn!(error = %err, "invalid tenant context for state.get");
459 return Ok(Err(types::IfaceError::InvalidArg));
460 }
461 };
462 let key = StoreStateKey::from(key);
463 match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
464 Ok(Some(value)) => {
465 let result = if let Some(text) = value.as_str() {
466 text.to_string()
467 } else {
468 serde_json::to_string(&value).unwrap_or_else(|_| value.to_string())
469 };
470 Ok(Ok(result))
471 }
472 Ok(None) => Ok(Err(types::IfaceError::NotFound)),
473 Err(err) => {
474 tracing::warn!(error = %err, "state.get failed");
475 Ok(Err(types::IfaceError::Internal))
476 }
477 }
478 }
479
480 fn state_set(
481 &mut self,
482 key: iface_types::StateKey,
483 value_json: String,
484 ctx: Option<types::TenantCtx>,
485 ) -> WasmResult<Result<state::OpAck, types::IfaceError>> {
486 let store = match self.state_store_handle() {
487 Ok(store) => store,
488 Err(err) => return Ok(Err(err)),
489 };
490 let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
491 Ok(ctx) => ctx,
492 Err(err) => {
493 tracing::warn!(error = %err, "invalid tenant context for state.set");
494 return Ok(Err(types::IfaceError::InvalidArg));
495 }
496 };
497 let key = StoreStateKey::from(key);
498 let value = serde_json::from_str(&value_json).unwrap_or(Value::String(value_json));
499 match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
500 Ok(()) => Ok(Ok(state::OpAck::Ok)),
501 Err(err) => {
502 tracing::warn!(error = %err, "state.set failed");
503 Ok(Err(types::IfaceError::Internal))
504 }
505 }
506 }
507
508 fn session_update(
509 &mut self,
510 cursor: iface_types::SessionCursor,
511 ctx: Option<types::TenantCtx>,
512 ) -> WasmResult<Result<String, types::IfaceError>> {
513 let store = match self.session_store_handle() {
514 Ok(store) => store,
515 Err(err) => return Ok(Err(err)),
516 };
517 let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
518 Ok(ctx) => ctx,
519 Err(err) => {
520 tracing::warn!(error = %err, "invalid tenant context for session.update");
521 return Ok(Err(types::IfaceError::InvalidArg));
522 }
523 };
524 let user = match Self::ensure_user(&tenant_ctx) {
525 Ok(user) => user,
526 Err(err) => return Ok(Err(err)),
527 };
528 let flow_id = match Self::ensure_flow(&tenant_ctx) {
529 Ok(flow) => flow,
530 Err(err) => return Ok(Err(err)),
531 };
532 let cursor = Self::cursor_from_iface(cursor);
533 let payload = SessionData {
534 tenant_ctx: tenant_ctx.clone(),
535 flow_id,
536 cursor: cursor.clone(),
537 context_json: serde_json::json!({
538 "node_pointer": cursor.node_pointer,
539 "wait_reason": cursor.wait_reason,
540 "outbox_marker": cursor.outbox_marker,
541 })
542 .to_string(),
543 };
544 if let Some(existing) = tenant_ctx.session_id() {
545 let key = StoreSessionKey::from(existing.to_string());
546 if let Err(err) = store.update_session(&key, payload) {
547 tracing::error!(error = %err, "failed to update session snapshot");
548 return Ok(Err(types::IfaceError::Internal));
549 }
550 return Ok(Ok(existing.to_string()));
551 }
552 match store.find_by_user(&tenant_ctx, &user) {
553 Ok(Some((key, _))) => {
554 if let Err(err) = store.update_session(&key, payload) {
555 tracing::error!(error = %err, "failed to update existing user session");
556 return Ok(Err(types::IfaceError::Internal));
557 }
558 return Ok(Ok(key.to_string()));
559 }
560 Ok(None) => {}
561 Err(err) => {
562 tracing::error!(error = %err, "session lookup failed");
563 return Ok(Err(types::IfaceError::Internal));
564 }
565 }
566 let key = match store.create_session(&tenant_ctx, payload.clone()) {
567 Ok(key) => key,
568 Err(err) => {
569 tracing::error!(error = %err, "failed to create session");
570 return Ok(Err(types::IfaceError::Internal));
571 }
572 };
573 let ctx_with_session = tenant_ctx.with_session(key.to_string());
574 let updated_payload = SessionData {
575 tenant_ctx: ctx_with_session.clone(),
576 ..payload
577 };
578 if let Err(err) = store.update_session(&key, updated_payload) {
579 tracing::warn!(error = %err, "failed to stamp session id after create");
580 }
581 Ok(Ok(key.to_string()))
582 }
583}
584
585impl greentic_interfaces::host_import_v0_2::HostImports for HostState {
586 fn secrets_get(
587 &mut self,
588 key: String,
589 _ctx: Option<LegacyTenantCtx>,
590 ) -> WasmResult<Result<String, LegacyIfaceError>> {
591 Ok(self.get_secret(&key).map_err(|err| {
592 tracing::warn!(secret = %key, error = %err, "secret lookup denied");
593 LegacyIfaceError::Denied
594 }))
595 }
596
597 fn telemetry_emit(
598 &mut self,
599 span_json: String,
600 _ctx: Option<LegacyTenantCtx>,
601 ) -> WasmResult<()> {
602 if let Some(mock) = &self.mocks
603 && mock.telemetry_drain(&[("span_json", span_json.as_str())])
604 {
605 return Ok(());
606 }
607 tracing::info!(span = %span_json, "telemetry emit from pack");
608 Ok(())
609 }
610
611 fn tool_invoke(
612 &mut self,
613 tool: String,
614 action: String,
615 args_json: String,
616 ctx: Option<LegacyTenantCtx>,
617 ) -> WasmResult<Result<String, LegacyIfaceError>> {
618 #[cfg(not(feature = "mcp"))]
619 {
620 let _ = (tool, action, args_json, ctx);
621 tracing::warn!("tool invoke requested but crate built without `mcp` feature");
622 return Ok(Err(LegacyIfaceError::Unavailable));
623 }
624
625 #[cfg(feature = "mcp")]
626 {
627 let exec_config = match &self.exec_config {
628 Some(cfg) => cfg.clone(),
629 None => {
630 tracing::warn!(%tool, %action, "exec config unavailable for tool invoke");
631 return Ok(Err(LegacyIfaceError::Unavailable));
632 }
633 };
634
635 let args: Value = match serde_json::from_str(&args_json) {
636 Ok(value) => value,
637 Err(err) => {
638 tracing::warn!(error = %err, "invalid args for tool invoke");
639 return Ok(Err(LegacyIfaceError::InvalidArg));
640 }
641 };
642
643 let tenant = ctx.map(|ctx| map_legacy_tenant_ctx(ctx, &self.default_env));
644
645 let request = ExecRequest {
646 component: tool.clone(),
647 action: action.clone(),
648 args,
649 tenant,
650 };
651
652 if let Some(mock) = &self.mocks
653 && let Some(result) = mock.tool_short_circuit(&tool, &action)
654 {
655 return match result.and_then(|value| {
656 serde_json::to_string(&value)
657 .map_err(|err| anyhow!("failed to serialise mock tool output: {err}"))
658 }) {
659 Ok(body) => Ok(Ok(body)),
660 Err(err) => {
661 tracing::error!(error = %err, "mock tool execution failed");
662 Ok(Err(LegacyIfaceError::Internal))
663 }
664 };
665 }
666
667 match greentic_mcp::exec(request, &exec_config) {
668 Ok(value) => match serde_json::to_string(&value) {
669 Ok(body) => Ok(Ok(body)),
670 Err(err) => {
671 tracing::error!(error = %err, "failed to serialise tool result");
672 Ok(Err(LegacyIfaceError::Internal))
673 }
674 },
675 Err(err) => {
676 tracing::warn!(%tool, %action, error = %err, "tool invoke failed");
677 let iface_err = match err {
678 ExecError::NotFound { .. } => LegacyIfaceError::NotFound,
679 ExecError::Tool { .. } => LegacyIfaceError::Denied,
680 _ => LegacyIfaceError::Unavailable,
681 };
682 Ok(Err(iface_err))
683 }
684 }
685 }
686 }
687
688 fn http_fetch(
689 &mut self,
690 req: LegacyHttpRequest,
691 _ctx: Option<LegacyTenantCtx>,
692 ) -> WasmResult<Result<LegacyHttpResponse, LegacyIfaceError>> {
693 if !self.config.http_enabled {
694 tracing::warn!(url = %req.url, "http fetch denied by policy");
695 return Ok(Err(LegacyIfaceError::Denied));
696 }
697
698 let mut mock_state = None;
699 let raw_body = req.body.clone();
700 if let Some(mock) = &self.mocks
701 && let Ok(meta) = HttpMockRequest::new(
702 &req.method,
703 &req.url,
704 raw_body.as_deref().map(|body| body.as_bytes()),
705 )
706 {
707 match mock.http_begin(&meta) {
708 HttpDecision::Mock(response) => {
709 let http = LegacyHttpResponse::from(&response);
710 return Ok(Ok(http));
711 }
712 HttpDecision::Deny(reason) => {
713 tracing::warn!(url = %req.url, reason = %reason, "http fetch blocked by mocks");
714 return Ok(Err(LegacyIfaceError::Denied));
715 }
716 HttpDecision::Passthrough { record } => {
717 mock_state = Some((meta, record));
718 }
719 }
720 }
721
722 let method = req.method.parse().unwrap_or(reqwest::Method::GET);
723 let mut builder = self.http_client.request(method, &req.url);
724
725 if let Some(headers_json) = req.headers_json.as_ref() {
726 match serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(headers_json) {
727 Ok(map) => {
728 for (key, value) in map {
729 if let Some(val) = value.as_str()
730 && let Ok(header) =
731 reqwest::header::HeaderName::from_bytes(key.as_bytes())
732 && let Ok(header_value) = reqwest::header::HeaderValue::from_str(val)
733 {
734 builder = builder.header(header, header_value);
735 }
736 }
737 }
738 Err(err) => {
739 tracing::warn!(error = %err, "failed to parse headers for http.fetch");
740 }
741 }
742 }
743
744 if let Some(body) = raw_body.clone() {
745 builder = builder.body(body);
746 }
747
748 let response = match builder.send() {
749 Ok(resp) => resp,
750 Err(err) => {
751 tracing::error!(url = %req.url, error = %err, "http fetch failed");
752 return Ok(Err(LegacyIfaceError::Unavailable));
753 }
754 };
755
756 let status = response.status().as_u16();
757 let headers_map = response
758 .headers()
759 .iter()
760 .map(|(k, v)| {
761 (
762 k.as_str().to_string(),
763 v.to_str().unwrap_or_default().to_string(),
764 )
765 })
766 .collect::<BTreeMap<_, _>>();
767 let headers_json = serde_json::to_string(&headers_map).ok();
768 let body = response.text().ok();
769
770 if let Some((meta, true)) = mock_state.take()
771 && let Some(mock) = &self.mocks
772 {
773 let recorded = HttpMockResponse::new(status, headers_map.clone(), body.clone());
774 mock.http_record(&meta, &recorded);
775 }
776
777 Ok(Ok(LegacyHttpResponse {
778 status,
779 headers_json,
780 body,
781 }))
782 }
783}
784
785impl PackRuntime {
786 #[allow(clippy::too_many_arguments)]
787 pub async fn load(
788 path: impl AsRef<Path>,
789 config: Arc<HostConfig>,
790 mocks: Option<Arc<MockLayer>>,
791 archive_source: Option<&Path>,
792 session_store: Option<DynSessionStore>,
793 state_store: Option<DynStateStore>,
794 wasi_policy: Arc<RunnerWasiPolicy>,
795 verify_archive: bool,
796 ) -> Result<Self> {
797 let path = path.as_ref();
798 let is_component = path
799 .extension()
800 .and_then(|ext| ext.to_str())
801 .map(|ext| ext.eq_ignore_ascii_case("wasm"))
802 .unwrap_or(false);
803 if verify_archive && is_component {
804 if let Some(archive) = archive_source {
805 verify::verify_pack(archive).await?;
806 tracing::info!(
807 pack_path = %archive.display(),
808 "pack verification complete (archive)"
809 );
810 }
811 } else if verify_archive {
812 verify::verify_pack(path).await?;
813 tracing::info!(pack_path = %path.display(), "pack verification complete");
814 }
815 let engine = Engine::default();
816 let wasm_bytes = fs::read(path).await?;
817 let mut metadata =
818 PackMetadata::from_wasm(&wasm_bytes).unwrap_or_else(|| PackMetadata::fallback(path));
819 let mut archive = None;
820 let component = match Component::from_file(&engine, path) {
821 Ok(component) => Some(component),
822 Err(err) => {
823 if let Some(archive_path) = archive_source {
824 tracing::warn!(
825 error = %err,
826 pack = %archive_path.display(),
827 "component load failed, using manifest archive"
828 );
829 let archive_data = ArchiveFlows::from_archive(archive_path)?;
830 metadata = archive_data.metadata.clone();
831 archive = Some(archive_data);
832 None
833 } else {
834 return Err(err);
835 }
836 }
837 };
838 Ok(Self {
839 path: path.to_path_buf(),
840 config,
841 engine,
842 component,
843 metadata,
844 mocks,
845 archive,
846 session_store,
847 state_store,
848 wasi_policy,
849 })
850 }
851
852 pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
853 if let Some(archive) = &self.archive {
854 return Ok(archive.descriptors.clone());
855 }
856 tracing::trace!(
857 tenant = %self.config.tenant,
858 pack_path = %self.path.display(),
859 "listing flows from pack"
860 );
861 let component = self
862 .component
863 .as_ref()
864 .ok_or_else(|| anyhow!("pack component unavailable"))?;
865 let host_state = HostState::new(
866 Arc::clone(&self.config),
867 self.mocks.clone(),
868 self.session_store.clone(),
869 self.state_store.clone(),
870 )?;
871 let mut store = Store::new(
872 &self.engine,
873 ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?,
874 );
875 let mut linker = Linker::new(&self.engine);
876 imports::register_all(&mut linker)?;
877 let bindings = pack_export_v0_2::PackExports::instantiate(&mut store, component, &linker)?;
878 let exports = bindings.greentic_pack_export_exports();
879 let flows_raw = match exports.call_list_flows(&mut store)? {
880 Ok(flows) => flows,
881 Err(err) => {
882 bail!("pack list_flows failed: {err:?}");
883 }
884 };
885 let flows = flows_raw
886 .into_iter()
887 .map(|flow: FlowInfo| {
888 let profile = flow.profile.clone();
889 let version = flow.version.clone();
890 FlowDescriptor {
891 id: flow.id,
892 flow_type: flow.flow_type,
893 profile,
894 version,
895 description: Some(format!("{}@{}", flow.profile, flow.version)),
896 }
897 })
898 .collect();
899 Ok(flows)
900 }
901
902 #[allow(dead_code)]
903 pub async fn run_flow(
904 &self,
905 _flow_id: &str,
906 _input: serde_json::Value,
907 ) -> Result<serde_json::Value> {
908 Ok(serde_json::json!({}))
910 }
911
912 pub fn load_flow_ir(&self, flow_id: &str) -> Result<greentic_flow::ir::FlowIR> {
913 if let Some(archive) = &self.archive {
914 return archive
915 .flows
916 .get(flow_id)
917 .cloned()
918 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in archive"));
919 }
920 let component = self
921 .component
922 .as_ref()
923 .ok_or_else(|| anyhow!("pack component unavailable"))?;
924 let host_state = HostState::new(
925 Arc::clone(&self.config),
926 self.mocks.clone(),
927 self.session_store.clone(),
928 self.state_store.clone(),
929 )?;
930 let mut store = Store::new(
931 &self.engine,
932 ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?,
933 );
934 let mut linker = Linker::new(&self.engine);
935 imports::register_all(&mut linker)?;
936 let bindings = pack_export_v0_2::PackExports::instantiate(&mut store, component, &linker)?;
937 let exports = bindings.greentic_pack_export_exports();
938 let flow_name = flow_id.to_string();
939 let metadata = match exports.call_flow_metadata(&mut store, &flow_name)? {
940 Ok(doc) => doc,
941 Err(err) => bail!("pack flow_metadata({flow_id}) failed: {err:?}"),
942 };
943 let flow_doc: greentic_flow::model::FlowDoc = serde_json::from_str(&metadata)
944 .or_else(|_| serde_yaml::from_str(&metadata))
945 .with_context(|| format!("failed to parse flow metadata for {flow_id}"))?;
946 let ir = greentic_flow::to_ir(flow_doc)?;
947 Ok(ir)
948 }
949
950 pub fn metadata(&self) -> &PackMetadata {
951 &self.metadata
952 }
953}
954
955#[cfg(feature = "mcp")]
956fn map_legacy_tenant_ctx(ctx: LegacyTenantCtx, default_env: &str) -> TypesTenantCtx {
957 let env = ctx
958 .deployment
959 .runtime
960 .unwrap_or_else(|| default_env.to_string());
961
962 let env_id = EnvId::from_str(env.as_str()).expect("invalid env id");
963 let tenant_id = TenantId::from_str(ctx.tenant.as_str()).expect("invalid tenant id");
964 let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
965 tenant_ctx = tenant_ctx.with_team(
966 ctx.team
967 .as_ref()
968 .and_then(|team| TeamId::from_str(team.as_str()).ok()),
969 );
970 tenant_ctx = tenant_ctx.with_user(
971 ctx.user
972 .as_ref()
973 .and_then(|user| UserId::from_str(user.as_str()).ok()),
974 );
975 tenant_ctx.trace_id = ctx.trace_id;
976 tenant_ctx
977}
978
979fn map_legacy_error(err: LegacyIfaceError) -> types::IfaceError {
980 match err {
981 LegacyIfaceError::InvalidArg => types::IfaceError::InvalidArg,
982 LegacyIfaceError::NotFound => types::IfaceError::NotFound,
983 LegacyIfaceError::Denied => types::IfaceError::Denied,
984 LegacyIfaceError::Unavailable => types::IfaceError::Unavailable,
985 LegacyIfaceError::Internal => types::IfaceError::Internal,
986 }
987}
988
989struct ArchiveFlows {
990 descriptors: Vec<FlowDescriptor>,
991 flows: HashMap<String, FlowIR>,
992 metadata: PackMetadata,
993}
994
995impl ArchiveFlows {
996 fn from_archive(path: &Path) -> Result<Self> {
997 let file = File::open(path)
998 .with_context(|| format!("failed to open archive {}", path.display()))?;
999 let mut archive = ZipArchive::new(file)
1000 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1001 let manifest_bytes = read_entry(&mut archive, "manifest.cbor")?;
1002 let manifest: greentic_pack::builder::PackManifest =
1003 serde_cbor::from_slice(&manifest_bytes).context("manifest.cbor is invalid")?;
1004
1005 let mut flows = HashMap::new();
1006 let mut descriptors = Vec::new();
1007 for flow in &manifest.flows {
1008 let ir = build_stub_flow_ir(&flow.id, &flow.kind);
1009 flows.insert(flow.id.clone(), ir);
1010 descriptors.push(FlowDescriptor {
1011 id: flow.id.clone(),
1012 flow_type: flow.kind.clone(),
1013 profile: manifest.meta.name.clone(),
1014 version: manifest.meta.version.to_string(),
1015 description: Some(flow.kind.clone()),
1016 });
1017 }
1018
1019 Ok(Self {
1020 metadata: PackMetadata::from_manifest(&manifest),
1021 descriptors,
1022 flows,
1023 })
1024 }
1025}
1026
1027fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1028 let mut file = archive
1029 .by_name(name)
1030 .with_context(|| format!("entry {name} missing from archive"))?;
1031 let mut buf = Vec::new();
1032 file.read_to_end(&mut buf)?;
1033 Ok(buf)
1034}
1035
1036fn build_stub_flow_ir(flow_id: &str, flow_type: &str) -> FlowIR {
1037 let mut nodes = IndexMap::new();
1038 nodes.insert(
1039 "complete".into(),
1040 NodeIR {
1041 component: "qa.process".into(),
1042 payload_expr: json!({
1043 "status": "done",
1044 "flow_id": flow_id,
1045 }),
1046 routes: vec![RouteIR {
1047 to: None,
1048 out: true,
1049 }],
1050 },
1051 );
1052 FlowIR {
1053 id: flow_id.to_string(),
1054 flow_type: flow_type.to_string(),
1055 start: Some("complete".into()),
1056 parameters: Value::Object(Default::default()),
1057 nodes,
1058 }
1059}
1060
1061#[derive(Clone, Debug, Default, Serialize, Deserialize)]
1062pub struct PackMetadata {
1063 pub pack_id: String,
1064 pub version: String,
1065 #[serde(default)]
1066 pub entry_flows: Vec<String>,
1067}
1068
1069impl PackMetadata {
1070 fn from_wasm(bytes: &[u8]) -> Option<Self> {
1071 let parser = Parser::new(0);
1072 for payload in parser.parse_all(bytes) {
1073 let payload = payload.ok()?;
1074 match payload {
1075 Payload::CustomSection(section) => {
1076 if section.name() == "greentic.manifest"
1077 && let Ok(meta) = Self::from_bytes(section.data())
1078 {
1079 return Some(meta);
1080 }
1081 }
1082 Payload::DataSection(reader) => {
1083 for segment in reader.into_iter().flatten() {
1084 if let Ok(meta) = Self::from_bytes(segment.data) {
1085 return Some(meta);
1086 }
1087 }
1088 }
1089 _ => {}
1090 }
1091 }
1092 None
1093 }
1094
1095 fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
1096 #[derive(Deserialize)]
1097 struct RawManifest {
1098 pack_id: String,
1099 version: String,
1100 #[serde(default)]
1101 entry_flows: Vec<String>,
1102 #[serde(default)]
1103 flows: Vec<RawFlow>,
1104 }
1105
1106 #[derive(Deserialize)]
1107 struct RawFlow {
1108 id: String,
1109 }
1110
1111 let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
1112 let mut entry_flows = if manifest.entry_flows.is_empty() {
1113 manifest.flows.iter().map(|f| f.id.clone()).collect()
1114 } else {
1115 manifest.entry_flows.clone()
1116 };
1117 entry_flows.retain(|id| !id.is_empty());
1118 Ok(Self {
1119 pack_id: manifest.pack_id,
1120 version: manifest.version,
1121 entry_flows,
1122 })
1123 }
1124
1125 pub fn fallback(path: &Path) -> Self {
1126 let pack_id = path
1127 .file_stem()
1128 .map(|s| s.to_string_lossy().into_owned())
1129 .unwrap_or_else(|| "unknown-pack".to_string());
1130 Self {
1131 pack_id,
1132 version: "0.0.0".to_string(),
1133 entry_flows: Vec::new(),
1134 }
1135 }
1136
1137 pub fn from_manifest(manifest: &greentic_pack::builder::PackManifest) -> Self {
1138 let entry_flows = if manifest.meta.entry_flows.is_empty() {
1139 manifest
1140 .flows
1141 .iter()
1142 .map(|flow| flow.id.clone())
1143 .collect::<Vec<_>>()
1144 } else {
1145 manifest.meta.entry_flows.clone()
1146 };
1147 Self {
1148 pack_id: manifest.meta.pack_id.clone(),
1149 version: manifest.meta.version.to_string(),
1150 entry_flows,
1151 }
1152 }
1153}
1154
1155impl From<&HttpMockResponse> for LegacyHttpResponse {
1156 fn from(value: &HttpMockResponse) -> Self {
1157 let headers_json = serde_json::to_string(&value.headers).ok();
1158 Self {
1159 status: value.status,
1160 headers_json,
1161 body: value.body.clone(),
1162 }
1163 }
1164}