1use std::collections::{BTreeMap, HashMap};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7
8use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
9use crate::runtime_wasmtime::{Component, Engine, ResourceTable, WasmResult};
10use anyhow::{Context, Result, anyhow, bail};
11use greentic_flow::ir::{FlowIR, NodeIR, RouteIR};
12use greentic_interfaces_host::host_import::v0_2 as host_import_v0_2;
13use greentic_interfaces_host::host_import::v0_2::greentic::host_import::imports::{
14 HttpRequest as LegacyHttpRequest, HttpResponse as LegacyHttpResponse,
15 IfaceError as LegacyIfaceError, TenantCtx as LegacyTenantCtx,
16};
17use greentic_interfaces_host::host_import::v0_6::{
18 self as host_import_v0_6, iface_types, state, types,
19};
20#[cfg(feature = "mcp")]
21use greentic_mcp::{ExecConfig, ExecError, ExecRequest};
22use greentic_pack::reader::{SigningPolicy, open_pack};
23use greentic_session::SessionKey as StoreSessionKey;
24use greentic_types::{
25 EnvId, FlowId, SessionCursor as StoreSessionCursor, SessionData, StateKey as StoreStateKey,
26 TeamId, TenantCtx as TypesTenantCtx, TenantId, UserId,
27};
28use indexmap::IndexMap;
29use reqwest::blocking::Client as BlockingClient;
30use serde::{Deserialize, Serialize};
31use serde_cbor;
32use serde_json::{self, Value};
33use serde_yaml_bw as serde_yaml;
34use tokio::fs;
35use wasmparser::{Parser, Payload};
36use zip::ZipArchive;
37
38use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
39
40use crate::config::HostConfig;
41use crate::secrets::{DynSecretsManager, read_secret_blocking};
42use crate::storage::state::STATE_PREFIX;
43use crate::storage::{DynSessionStore, DynStateStore};
44use crate::verify;
45use crate::wasi::RunnerWasiPolicy;
46use tracing::warn;
47use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
48
49#[allow(dead_code)]
50pub struct PackRuntime {
51 path: PathBuf,
52 config: Arc<HostConfig>,
53 engine: Engine,
54 component: Option<Component>,
55 metadata: PackMetadata,
56 mocks: Option<Arc<MockLayer>>,
57 flows: Option<PackFlows>,
58 session_store: Option<DynSessionStore>,
59 state_store: Option<DynStateStore>,
60 wasi_policy: Arc<RunnerWasiPolicy>,
61 secrets: DynSecretsManager,
62 oauth_config: Option<OAuthBrokerConfig>,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct FlowDescriptor {
67 pub id: String,
68 #[serde(rename = "type")]
69 pub flow_type: String,
70 pub profile: String,
71 pub version: String,
72 #[serde(default)]
73 pub description: Option<String>,
74}
75
76pub struct HostState {
77 config: Arc<HostConfig>,
78 http_client: BlockingClient,
79 #[cfg(feature = "mcp")]
80 exec_config: Option<ExecConfig>,
81 default_env: String,
82 session_store: Option<DynSessionStore>,
83 state_store: Option<DynStateStore>,
84 mocks: Option<Arc<MockLayer>>,
85 secrets: DynSecretsManager,
86 oauth_config: Option<OAuthBrokerConfig>,
87 oauth_host: OAuthBrokerHost,
88}
89
90impl HostState {
91 #[allow(clippy::default_constructed_unit_structs)]
92 pub fn new(
93 config: Arc<HostConfig>,
94 mocks: Option<Arc<MockLayer>>,
95 session_store: Option<DynSessionStore>,
96 state_store: Option<DynStateStore>,
97 secrets: DynSecretsManager,
98 oauth_config: Option<OAuthBrokerConfig>,
99 ) -> Result<Self> {
100 let http_client = BlockingClient::builder().build()?;
101 #[cfg(feature = "mcp")]
102 let exec_config = config.mcp_exec_config().ok();
103 let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
104 Ok(Self {
105 config,
106 http_client,
107 #[cfg(feature = "mcp")]
108 exec_config,
109 default_env,
110 session_store,
111 state_store,
112 mocks,
113 secrets,
114 oauth_config,
115 oauth_host: OAuthBrokerHost::default(),
116 })
117 }
118
119 pub fn get_secret(&self, key: &str) -> Result<String> {
120 if !self.config.secrets_policy.is_allowed(key) {
121 bail!("secret {key} is not permitted by bindings policy");
122 }
123 if let Some(mock) = &self.mocks
124 && let Some(value) = mock.secrets_lookup(key)
125 {
126 return Ok(value);
127 }
128 let bytes = read_secret_blocking(&self.secrets, key)
129 .context("failed to read secret from manager")?;
130 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
131 Ok(value)
132 }
133
134 fn tenant_ctx_from_v6(&self, ctx: Option<types::TenantCtx>) -> Result<TypesTenantCtx> {
135 let tenant_raw = ctx
136 .as_ref()
137 .map(|ctx| ctx.tenant.clone())
138 .unwrap_or_else(|| self.config.tenant.clone());
139 let tenant_id = TenantId::from_str(&tenant_raw)
140 .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
141 let env_id = EnvId::from_str(&self.default_env)
142 .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
143 let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
144 if let Some(ctx) = ctx {
145 if let Some(team) = ctx.team {
146 let team_id =
147 TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
148 tenant_ctx = tenant_ctx.with_team(Some(team_id));
149 }
150 if let Some(user) = ctx.user {
151 let user_id =
152 UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
153 tenant_ctx = tenant_ctx.with_user(Some(user_id));
154 }
155 if let Some(flow) = ctx.flow_id {
156 tenant_ctx = tenant_ctx.with_flow(flow);
157 }
158 if let Some(node) = ctx.node_id {
159 tenant_ctx = tenant_ctx.with_node(node);
160 }
161 if let Some(provider) = ctx.provider_id {
162 tenant_ctx = tenant_ctx.with_provider(provider);
163 }
164 if let Some(session) = ctx.session_id {
165 tenant_ctx = tenant_ctx.with_session(session);
166 }
167 tenant_ctx.trace_id = ctx.trace_id;
168 }
169 Ok(tenant_ctx)
170 }
171
172 fn session_store_handle(&self) -> Result<DynSessionStore, types::IfaceError> {
173 self.session_store
174 .as_ref()
175 .cloned()
176 .ok_or(types::IfaceError::Unavailable)
177 }
178
179 fn state_store_handle(&self) -> Result<DynStateStore, types::IfaceError> {
180 self.state_store
181 .as_ref()
182 .cloned()
183 .ok_or(types::IfaceError::Unavailable)
184 }
185
186 fn ensure_user(ctx: &TypesTenantCtx) -> Result<UserId, types::IfaceError> {
187 ctx.user_id
188 .clone()
189 .or_else(|| ctx.user.clone())
190 .ok_or(types::IfaceError::InvalidArg)
191 }
192
193 fn ensure_flow(ctx: &TypesTenantCtx) -> Result<FlowId, types::IfaceError> {
194 let flow = ctx.flow_id().ok_or(types::IfaceError::InvalidArg)?;
195 FlowId::from_str(flow).map_err(|_| types::IfaceError::InvalidArg)
196 }
197
198 fn cursor_from_iface(cursor: iface_types::SessionCursor) -> StoreSessionCursor {
199 let mut store_cursor = StoreSessionCursor::new(cursor.node_pointer);
200 if let Some(reason) = cursor.wait_reason {
201 store_cursor = store_cursor.with_wait_reason(reason);
202 }
203 if let Some(marker) = cursor.outbox_marker {
204 store_cursor = store_cursor.with_outbox_marker(marker);
205 }
206 store_cursor
207 }
208}
209
210pub struct ComponentState {
211 host: HostState,
212 wasi_ctx: WasiCtx,
213 resource_table: ResourceTable,
214}
215
216impl ComponentState {
217 pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
218 let wasi_ctx = policy
219 .instantiate()
220 .context("failed to build WASI context")?;
221 Ok(Self {
222 host,
223 wasi_ctx,
224 resource_table: ResourceTable::new(),
225 })
226 }
227
228 fn host_mut(&mut self) -> &mut HostState {
229 &mut self.host
230 }
231}
232
233impl OAuthHostContext for ComponentState {
234 fn tenant_id(&self) -> &str {
235 &self.host.config.tenant
236 }
237
238 fn env(&self) -> &str {
239 &self.host.default_env
240 }
241
242 fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
243 &mut self.host.oauth_host
244 }
245
246 fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
247 self.host.oauth_config.as_ref()
248 }
249}
250
251impl WasiView for ComponentState {
252 fn ctx(&mut self) -> WasiCtxView<'_> {
253 WasiCtxView {
254 ctx: &mut self.wasi_ctx,
255 table: &mut self.resource_table,
256 }
257 }
258}
259
260#[allow(unsafe_code)]
261unsafe impl Send for ComponentState {}
262#[allow(unsafe_code)]
263unsafe impl Sync for ComponentState {}
264
265impl host_import_v0_6::HostImports for ComponentState {
266 fn secrets_get(
267 &mut self,
268 key: String,
269 ctx: Option<types::TenantCtx>,
270 ) -> WasmResult<Result<String, types::IfaceError>> {
271 host_import_v0_6::HostImports::secrets_get(self.host_mut(), key, ctx)
272 }
273
274 fn telemetry_emit(
275 &mut self,
276 span_json: String,
277 ctx: Option<types::TenantCtx>,
278 ) -> WasmResult<()> {
279 host_import_v0_6::HostImports::telemetry_emit(self.host_mut(), span_json, ctx)
280 }
281
282 fn http_fetch(
283 &mut self,
284 req: host_import_v0_6::http::HttpRequest,
285 ctx: Option<types::TenantCtx>,
286 ) -> WasmResult<Result<host_import_v0_6::http::HttpResponse, types::IfaceError>> {
287 host_import_v0_6::HostImports::http_fetch(self.host_mut(), req, ctx)
288 }
289
290 fn mcp_exec(
291 &mut self,
292 component: String,
293 action: String,
294 args_json: String,
295 ctx: Option<types::TenantCtx>,
296 ) -> WasmResult<Result<String, types::IfaceError>> {
297 host_import_v0_6::HostImports::mcp_exec(self.host_mut(), component, action, args_json, ctx)
298 }
299
300 fn state_get(
301 &mut self,
302 key: iface_types::StateKey,
303 ctx: Option<types::TenantCtx>,
304 ) -> WasmResult<Result<String, types::IfaceError>> {
305 host_import_v0_6::HostImports::state_get(self.host_mut(), key, ctx)
306 }
307
308 fn state_set(
309 &mut self,
310 key: iface_types::StateKey,
311 value_json: String,
312 ctx: Option<types::TenantCtx>,
313 ) -> WasmResult<Result<state::OpAck, types::IfaceError>> {
314 host_import_v0_6::HostImports::state_set(self.host_mut(), key, value_json, ctx)
315 }
316
317 fn session_update(
318 &mut self,
319 cursor: iface_types::SessionCursor,
320 ctx: Option<types::TenantCtx>,
321 ) -> WasmResult<Result<String, types::IfaceError>> {
322 host_import_v0_6::HostImports::session_update(self.host_mut(), cursor, ctx)
323 }
324}
325
326impl host_import_v0_2::HostImports for ComponentState {
327 fn secrets_get(
328 &mut self,
329 key: String,
330 ctx: Option<LegacyTenantCtx>,
331 ) -> WasmResult<Result<String, LegacyIfaceError>> {
332 host_import_v0_2::HostImports::secrets_get(self.host_mut(), key, ctx)
333 }
334
335 fn telemetry_emit(
336 &mut self,
337 span_json: String,
338 ctx: Option<LegacyTenantCtx>,
339 ) -> WasmResult<()> {
340 host_import_v0_2::HostImports::telemetry_emit(self.host_mut(), span_json, ctx)
341 }
342
343 fn tool_invoke(
344 &mut self,
345 tool: String,
346 action: String,
347 args_json: String,
348 ctx: Option<LegacyTenantCtx>,
349 ) -> WasmResult<Result<String, LegacyIfaceError>> {
350 host_import_v0_2::HostImports::tool_invoke(self.host_mut(), tool, action, args_json, ctx)
351 }
352
353 fn http_fetch(
354 &mut self,
355 req: host_import_v0_2::greentic::host_import::imports::HttpRequest,
356 ctx: Option<host_import_v0_2::greentic::host_import::imports::TenantCtx>,
357 ) -> WasmResult<
358 Result<
359 host_import_v0_2::greentic::host_import::imports::HttpResponse,
360 host_import_v0_2::greentic::host_import::imports::IfaceError,
361 >,
362 > {
363 host_import_v0_2::HostImports::http_fetch(self.host_mut(), req, ctx)
364 }
365}
366
367impl host_import_v0_6::HostImports for HostState {
368 fn secrets_get(
369 &mut self,
370 key: String,
371 _ctx: Option<types::TenantCtx>,
372 ) -> WasmResult<Result<String, types::IfaceError>> {
373 Ok(self.get_secret(&key).map_err(|err| {
374 tracing::warn!(secret = %key, error = %err, "secret lookup denied");
375 types::IfaceError::Denied
376 }))
377 }
378
379 fn telemetry_emit(
380 &mut self,
381 span_json: String,
382 _ctx: Option<types::TenantCtx>,
383 ) -> WasmResult<()> {
384 if let Some(mock) = &self.mocks
385 && mock.telemetry_drain(&[("span_json", span_json.as_str())])
386 {
387 return Ok(());
388 }
389 tracing::info!(span = %span_json, "telemetry emit from pack");
390 Ok(())
391 }
392
393 fn http_fetch(
394 &mut self,
395 req: host_import_v0_6::http::HttpRequest,
396 _ctx: Option<types::TenantCtx>,
397 ) -> WasmResult<Result<host_import_v0_6::http::HttpResponse, types::IfaceError>> {
398 let legacy_req = LegacyHttpRequest {
399 method: req.method,
400 url: req.url,
401 headers_json: req.headers_json,
402 body: req.body,
403 };
404 match host_import_v0_2::HostImports::http_fetch(self, legacy_req, None)? {
405 Ok(resp) => Ok(Ok(host_import_v0_6::http::HttpResponse {
406 status: resp.status,
407 headers_json: resp.headers_json,
408 body: resp.body,
409 })),
410 Err(err) => Ok(Err(map_legacy_error(err))),
411 }
412 }
413
414 fn mcp_exec(
415 &mut self,
416 component: String,
417 action: String,
418 args_json: String,
419 ctx: Option<types::TenantCtx>,
420 ) -> WasmResult<Result<String, types::IfaceError>> {
421 #[cfg(not(feature = "mcp"))]
422 {
423 let _ = (component, action, args_json, ctx);
424 tracing::warn!("mcp.exec requested but crate built without `mcp` feature");
425 return Ok(Err(types::IfaceError::Unavailable));
426 }
427 #[cfg(feature = "mcp")]
428 {
429 let exec_config = match &self.exec_config {
430 Some(cfg) => cfg.clone(),
431 None => {
432 tracing::warn!(component = %component, action = %action, "exec config unavailable for tool invoke");
433 return Ok(Err(types::IfaceError::Unavailable));
434 }
435 };
436 let args: Value = match serde_json::from_str(&args_json) {
437 Ok(value) => value,
438 Err(err) => {
439 tracing::warn!(error = %err, "invalid args for mcp.exec node");
440 return Ok(Err(types::IfaceError::InvalidArg));
441 }
442 };
443 let tenant = match self.tenant_ctx_from_v6(ctx) {
444 Ok(ctx) => Some(ctx),
445 Err(err) => {
446 tracing::warn!(error = %err, "failed to parse tenant context for mcp.exec");
447 None
448 }
449 };
450 let request = ExecRequest {
451 component: component.clone(),
452 action: action.clone(),
453 args,
454 tenant,
455 };
456 match greentic_mcp::exec(request, &exec_config) {
457 Ok(value) => match serde_json::to_string(&value) {
458 Ok(body) => Ok(Ok(body)),
459 Err(err) => {
460 tracing::error!(error = %err, "failed to serialise tool result");
461 Ok(Err(types::IfaceError::Internal))
462 }
463 },
464 Err(err) => {
465 tracing::warn!(component = %component, action = %action, error = %err, "mcp exec failed");
466 let iface_err = match err {
467 ExecError::NotFound { .. } => types::IfaceError::NotFound,
468 ExecError::Tool { .. } => types::IfaceError::Denied,
469 _ => types::IfaceError::Unavailable,
470 };
471 Ok(Err(iface_err))
472 }
473 }
474 }
475 }
476
477 fn state_get(
478 &mut self,
479 key: iface_types::StateKey,
480 ctx: Option<types::TenantCtx>,
481 ) -> WasmResult<Result<String, types::IfaceError>> {
482 let store = match self.state_store_handle() {
483 Ok(store) => store,
484 Err(err) => return Ok(Err(err)),
485 };
486 let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
487 Ok(ctx) => ctx,
488 Err(err) => {
489 tracing::warn!(error = %err, "invalid tenant context for state.get");
490 return Ok(Err(types::IfaceError::InvalidArg));
491 }
492 };
493 let key = StoreStateKey::from(key);
494 match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
495 Ok(Some(value)) => {
496 let result = if let Some(text) = value.as_str() {
497 text.to_string()
498 } else {
499 serde_json::to_string(&value).unwrap_or_else(|_| value.to_string())
500 };
501 Ok(Ok(result))
502 }
503 Ok(None) => Ok(Err(types::IfaceError::NotFound)),
504 Err(err) => {
505 tracing::warn!(error = %err, "state.get failed");
506 Ok(Err(types::IfaceError::Internal))
507 }
508 }
509 }
510
511 fn state_set(
512 &mut self,
513 key: iface_types::StateKey,
514 value_json: String,
515 ctx: Option<types::TenantCtx>,
516 ) -> WasmResult<Result<state::OpAck, types::IfaceError>> {
517 let store = match self.state_store_handle() {
518 Ok(store) => store,
519 Err(err) => return Ok(Err(err)),
520 };
521 let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
522 Ok(ctx) => ctx,
523 Err(err) => {
524 tracing::warn!(error = %err, "invalid tenant context for state.set");
525 return Ok(Err(types::IfaceError::InvalidArg));
526 }
527 };
528 let key = StoreStateKey::from(key);
529 let value = serde_json::from_str(&value_json).unwrap_or(Value::String(value_json));
530 match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
531 Ok(()) => Ok(Ok(state::OpAck::Ok)),
532 Err(err) => {
533 tracing::warn!(error = %err, "state.set failed");
534 Ok(Err(types::IfaceError::Internal))
535 }
536 }
537 }
538
539 fn session_update(
540 &mut self,
541 cursor: iface_types::SessionCursor,
542 ctx: Option<types::TenantCtx>,
543 ) -> WasmResult<Result<String, types::IfaceError>> {
544 let store = match self.session_store_handle() {
545 Ok(store) => store,
546 Err(err) => return Ok(Err(err)),
547 };
548 let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
549 Ok(ctx) => ctx,
550 Err(err) => {
551 tracing::warn!(error = %err, "invalid tenant context for session.update");
552 return Ok(Err(types::IfaceError::InvalidArg));
553 }
554 };
555 let user = match Self::ensure_user(&tenant_ctx) {
556 Ok(user) => user,
557 Err(err) => return Ok(Err(err)),
558 };
559 let flow_id = match Self::ensure_flow(&tenant_ctx) {
560 Ok(flow) => flow,
561 Err(err) => return Ok(Err(err)),
562 };
563 let cursor = Self::cursor_from_iface(cursor);
564 let payload = SessionData {
565 tenant_ctx: tenant_ctx.clone(),
566 flow_id,
567 cursor: cursor.clone(),
568 context_json: serde_json::json!({
569 "node_pointer": cursor.node_pointer,
570 "wait_reason": cursor.wait_reason,
571 "outbox_marker": cursor.outbox_marker,
572 })
573 .to_string(),
574 };
575 if let Some(existing) = tenant_ctx.session_id() {
576 let key = StoreSessionKey::from(existing.to_string());
577 if let Err(err) = store.update_session(&key, payload) {
578 tracing::error!(error = %err, "failed to update session snapshot");
579 return Ok(Err(types::IfaceError::Internal));
580 }
581 return Ok(Ok(existing.to_string()));
582 }
583 match store.find_by_user(&tenant_ctx, &user) {
584 Ok(Some((key, _))) => {
585 if let Err(err) = store.update_session(&key, payload) {
586 tracing::error!(error = %err, "failed to update existing user session");
587 return Ok(Err(types::IfaceError::Internal));
588 }
589 return Ok(Ok(key.to_string()));
590 }
591 Ok(None) => {}
592 Err(err) => {
593 tracing::error!(error = %err, "session lookup failed");
594 return Ok(Err(types::IfaceError::Internal));
595 }
596 }
597 let key = match store.create_session(&tenant_ctx, payload.clone()) {
598 Ok(key) => key,
599 Err(err) => {
600 tracing::error!(error = %err, "failed to create session");
601 return Ok(Err(types::IfaceError::Internal));
602 }
603 };
604 let ctx_with_session = tenant_ctx.with_session(key.to_string());
605 let updated_payload = SessionData {
606 tenant_ctx: ctx_with_session.clone(),
607 ..payload
608 };
609 if let Err(err) = store.update_session(&key, updated_payload) {
610 tracing::warn!(error = %err, "failed to stamp session id after create");
611 }
612 Ok(Ok(key.to_string()))
613 }
614}
615
616impl host_import_v0_2::HostImports for HostState {
617 fn secrets_get(
618 &mut self,
619 key: String,
620 _ctx: Option<LegacyTenantCtx>,
621 ) -> WasmResult<Result<String, LegacyIfaceError>> {
622 Ok(self.get_secret(&key).map_err(|err| {
623 tracing::warn!(secret = %key, error = %err, "secret lookup denied");
624 LegacyIfaceError::Denied
625 }))
626 }
627
628 fn telemetry_emit(
629 &mut self,
630 span_json: String,
631 _ctx: Option<LegacyTenantCtx>,
632 ) -> WasmResult<()> {
633 if let Some(mock) = &self.mocks
634 && mock.telemetry_drain(&[("span_json", span_json.as_str())])
635 {
636 return Ok(());
637 }
638 tracing::info!(span = %span_json, "telemetry emit from pack");
639 Ok(())
640 }
641
642 fn tool_invoke(
643 &mut self,
644 tool: String,
645 action: String,
646 args_json: String,
647 ctx: Option<LegacyTenantCtx>,
648 ) -> WasmResult<Result<String, LegacyIfaceError>> {
649 #[cfg(not(feature = "mcp"))]
650 {
651 let _ = (tool, action, args_json, ctx);
652 tracing::warn!("tool invoke requested but crate built without `mcp` feature");
653 return Ok(Err(LegacyIfaceError::Unavailable));
654 }
655
656 #[cfg(feature = "mcp")]
657 {
658 let exec_config = match &self.exec_config {
659 Some(cfg) => cfg.clone(),
660 None => {
661 tracing::warn!(%tool, %action, "exec config unavailable for tool invoke");
662 return Ok(Err(LegacyIfaceError::Unavailable));
663 }
664 };
665
666 let args: Value = match serde_json::from_str(&args_json) {
667 Ok(value) => value,
668 Err(err) => {
669 tracing::warn!(error = %err, "invalid args for tool invoke");
670 return Ok(Err(LegacyIfaceError::InvalidArg));
671 }
672 };
673
674 let tenant = ctx.map(|ctx| map_legacy_tenant_ctx(ctx, &self.default_env));
675
676 let request = ExecRequest {
677 component: tool.clone(),
678 action: action.clone(),
679 args,
680 tenant,
681 };
682
683 if let Some(mock) = &self.mocks
684 && let Some(result) = mock.tool_short_circuit(&tool, &action)
685 {
686 return match result.and_then(|value| {
687 serde_json::to_string(&value)
688 .map_err(|err| anyhow!("failed to serialise mock tool output: {err}"))
689 }) {
690 Ok(body) => Ok(Ok(body)),
691 Err(err) => {
692 tracing::error!(error = %err, "mock tool execution failed");
693 Ok(Err(LegacyIfaceError::Internal))
694 }
695 };
696 }
697
698 match greentic_mcp::exec(request, &exec_config) {
699 Ok(value) => match serde_json::to_string(&value) {
700 Ok(body) => Ok(Ok(body)),
701 Err(err) => {
702 tracing::error!(error = %err, "failed to serialise tool result");
703 Ok(Err(LegacyIfaceError::Internal))
704 }
705 },
706 Err(err) => {
707 tracing::warn!(%tool, %action, error = %err, "tool invoke failed");
708 let iface_err = match err {
709 ExecError::NotFound { .. } => LegacyIfaceError::NotFound,
710 ExecError::Tool { .. } => LegacyIfaceError::Denied,
711 _ => LegacyIfaceError::Unavailable,
712 };
713 Ok(Err(iface_err))
714 }
715 }
716 }
717 }
718
719 fn http_fetch(
720 &mut self,
721 req: LegacyHttpRequest,
722 _ctx: Option<LegacyTenantCtx>,
723 ) -> WasmResult<Result<LegacyHttpResponse, LegacyIfaceError>> {
724 if !self.config.http_enabled {
725 tracing::warn!(url = %req.url, "http fetch denied by policy");
726 return Ok(Err(LegacyIfaceError::Denied));
727 }
728
729 let mut mock_state = None;
730 let raw_body = req.body.clone();
731 if let Some(mock) = &self.mocks
732 && let Ok(meta) = HttpMockRequest::new(
733 &req.method,
734 &req.url,
735 raw_body.as_deref().map(|body| body.as_bytes()),
736 )
737 {
738 match mock.http_begin(&meta) {
739 HttpDecision::Mock(response) => {
740 let http = LegacyHttpResponse::from(&response);
741 return Ok(Ok(http));
742 }
743 HttpDecision::Deny(reason) => {
744 tracing::warn!(url = %req.url, reason = %reason, "http fetch blocked by mocks");
745 return Ok(Err(LegacyIfaceError::Denied));
746 }
747 HttpDecision::Passthrough { record } => {
748 mock_state = Some((meta, record));
749 }
750 }
751 }
752
753 let method = req.method.parse().unwrap_or(reqwest::Method::GET);
754 let mut builder = self.http_client.request(method, &req.url);
755
756 if let Some(headers_json) = req.headers_json.as_ref() {
757 match serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(headers_json) {
758 Ok(map) => {
759 for (key, value) in map {
760 if let Some(val) = value.as_str()
761 && let Ok(header) =
762 reqwest::header::HeaderName::from_bytes(key.as_bytes())
763 && let Ok(header_value) = reqwest::header::HeaderValue::from_str(val)
764 {
765 builder = builder.header(header, header_value);
766 }
767 }
768 }
769 Err(err) => {
770 tracing::warn!(error = %err, "failed to parse headers for http.fetch");
771 }
772 }
773 }
774
775 if let Some(body) = raw_body.clone() {
776 builder = builder.body(body);
777 }
778
779 let response = match builder.send() {
780 Ok(resp) => resp,
781 Err(err) => {
782 tracing::error!(url = %req.url, error = %err, "http fetch failed");
783 return Ok(Err(LegacyIfaceError::Unavailable));
784 }
785 };
786
787 let status = response.status().as_u16();
788 let headers_map = response
789 .headers()
790 .iter()
791 .map(|(k, v)| {
792 (
793 k.as_str().to_string(),
794 v.to_str().unwrap_or_default().to_string(),
795 )
796 })
797 .collect::<BTreeMap<_, _>>();
798 let headers_json = serde_json::to_string(&headers_map).ok();
799 let body = response.text().ok();
800
801 if let Some((meta, true)) = mock_state.take()
802 && let Some(mock) = &self.mocks
803 {
804 let recorded = HttpMockResponse::new(status, headers_map.clone(), body.clone());
805 mock.http_record(&meta, &recorded);
806 }
807
808 Ok(Ok(LegacyHttpResponse {
809 status,
810 headers_json,
811 body,
812 }))
813 }
814}
815
816impl PackRuntime {
817 #[allow(clippy::too_many_arguments)]
818 pub async fn load(
819 path: impl AsRef<Path>,
820 config: Arc<HostConfig>,
821 mocks: Option<Arc<MockLayer>>,
822 archive_source: Option<&Path>,
823 session_store: Option<DynSessionStore>,
824 state_store: Option<DynStateStore>,
825 wasi_policy: Arc<RunnerWasiPolicy>,
826 secrets: DynSecretsManager,
827 oauth_config: Option<OAuthBrokerConfig>,
828 verify_archive: bool,
829 ) -> Result<Self> {
830 let path = path.as_ref();
831 let is_component = path
832 .extension()
833 .and_then(|ext| ext.to_str())
834 .map(|ext| ext.eq_ignore_ascii_case("wasm"))
835 .unwrap_or(false);
836 let archive_hint = archive_source.or(if is_component { None } else { Some(path) });
837 if verify_archive {
838 let verify_target = archive_hint.unwrap_or(path);
839 verify::verify_pack(verify_target).await?;
840 tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
841 }
842 let engine = Engine::default();
843 let wasm_bytes = fs::read(path).await?;
844 let mut metadata =
845 PackMetadata::from_wasm(&wasm_bytes).unwrap_or_else(|| PackMetadata::fallback(path));
846 let mut flows = if let Some(archive_path) = archive_hint {
847 match PackFlows::from_archive(archive_path) {
848 Ok(cache) => {
849 metadata = cache.metadata.clone();
850 Some(cache)
851 }
852 Err(err) => {
853 warn!(
854 error = %err,
855 pack = %archive_path.display(),
856 "failed to parse pack flows via greentic-pack; falling back to component exports"
857 );
858 None
859 }
860 }
861 } else {
862 None
863 };
864 let component = match Component::from_file(&engine, path) {
865 Ok(component) => Some(component),
866 Err(err) => {
867 if let Some(archive_path) = archive_hint {
868 if flows.is_none() {
869 let archive_data =
870 PackFlows::from_archive(archive_path).with_context(|| {
871 format!(
872 "failed to build flow cache from {}",
873 archive_path.display()
874 )
875 })?;
876 metadata = archive_data.metadata.clone();
877 flows = Some(archive_data);
878 }
879 warn!(
880 error = %err,
881 pack = %archive_path.display(),
882 "component load failed; continuing with cached flow metadata"
883 );
884 None
885 } else {
886 return Err(err);
887 }
888 }
889 };
890 Ok(Self {
891 path: path.to_path_buf(),
892 config,
893 engine,
894 component,
895 metadata,
896 mocks,
897 flows,
898 session_store,
899 state_store,
900 wasi_policy,
901 secrets,
902 oauth_config,
903 })
904 }
905
906 pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
907 if let Some(cache) = &self.flows {
908 return Ok(cache.descriptors.clone());
909 }
910 Ok(Vec::new())
911 }
912
913 #[allow(dead_code)]
914 pub async fn run_flow(
915 &self,
916 _flow_id: &str,
917 _input: serde_json::Value,
918 ) -> Result<serde_json::Value> {
919 Ok(serde_json::json!({}))
921 }
922
923 pub fn load_flow_ir(&self, flow_id: &str) -> Result<greentic_flow::ir::FlowIR> {
924 if let Some(cache) = &self.flows {
925 return cache
926 .flows
927 .get(flow_id)
928 .cloned()
929 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
930 }
931 bail!("flow '{flow_id}' not available (pack exports disabled)")
932 }
933
934 pub fn metadata(&self) -> &PackMetadata {
935 &self.metadata
936 }
937}
938
939#[cfg(feature = "mcp")]
940fn map_legacy_tenant_ctx(ctx: LegacyTenantCtx, default_env: &str) -> TypesTenantCtx {
941 let env = ctx
942 .deployment
943 .runtime
944 .unwrap_or_else(|| default_env.to_string());
945
946 let env_id = EnvId::from_str(env.as_str()).expect("invalid env id");
947 let tenant_id = TenantId::from_str(ctx.tenant.as_str()).expect("invalid tenant id");
948 let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
949 tenant_ctx = tenant_ctx.with_team(
950 ctx.team
951 .as_ref()
952 .and_then(|team| TeamId::from_str(team.as_str()).ok()),
953 );
954 tenant_ctx = tenant_ctx.with_user(
955 ctx.user
956 .as_ref()
957 .and_then(|user| UserId::from_str(user.as_str()).ok()),
958 );
959 tenant_ctx.trace_id = ctx.trace_id;
960 tenant_ctx
961}
962
963fn map_legacy_error(err: LegacyIfaceError) -> types::IfaceError {
964 match err {
965 LegacyIfaceError::InvalidArg => types::IfaceError::InvalidArg,
966 LegacyIfaceError::NotFound => types::IfaceError::NotFound,
967 LegacyIfaceError::Denied => types::IfaceError::Denied,
968 LegacyIfaceError::Unavailable => types::IfaceError::Unavailable,
969 LegacyIfaceError::Internal => types::IfaceError::Internal,
970 }
971}
972
973struct PackFlows {
974 descriptors: Vec<FlowDescriptor>,
975 flows: HashMap<String, FlowIR>,
976 metadata: PackMetadata,
977}
978
979impl PackFlows {
980 fn from_archive(path: &Path) -> Result<Self> {
981 let pack = open_pack(path, SigningPolicy::DevOk)
982 .map_err(|err| anyhow!("failed to open pack {}: {}", path.display(), err.message))?;
983 let file = File::open(path)
984 .with_context(|| format!("failed to open archive {}", path.display()))?;
985 let mut archive = ZipArchive::new(file)
986 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
987 let mut flows = HashMap::new();
988 let mut descriptors = Vec::new();
989 for flow in &pack.manifest.flows {
990 match load_flow_entry(&mut archive, flow, &pack.manifest.meta) {
991 Ok((descriptor, ir)) => {
992 flows.insert(descriptor.id.clone(), ir);
993 descriptors.push(descriptor);
994 }
995 Err(err) => {
996 warn!(
997 error = %err,
998 flow_id = flow.id,
999 "failed to parse flow metadata from archive; using stub flow"
1000 );
1001 let stub = build_stub_flow_ir(&flow.id, &flow.kind);
1002 flows.insert(flow.id.clone(), stub.clone());
1003 descriptors.push(FlowDescriptor {
1004 id: flow.id.clone(),
1005 flow_type: flow.kind.clone(),
1006 profile: pack.manifest.meta.name.clone(),
1007 version: pack.manifest.meta.version.to_string(),
1008 description: Some(flow.kind.clone()),
1009 });
1010 }
1011 }
1012 }
1013
1014 Ok(Self {
1015 metadata: PackMetadata::from_manifest(&pack.manifest),
1016 descriptors,
1017 flows,
1018 })
1019 }
1020}
1021
1022fn load_flow_entry(
1023 archive: &mut ZipArchive<File>,
1024 entry: &greentic_pack::builder::FlowEntry,
1025 meta: &greentic_pack::builder::PackMeta,
1026) -> Result<(FlowDescriptor, FlowIR)> {
1027 let doc = load_flow_doc(archive, entry)?;
1028 let ir = greentic_flow::to_ir(doc.clone())
1029 .with_context(|| format!("failed to build IR for flow {}", doc.id))?;
1030 let descriptor = FlowDescriptor {
1031 id: doc.id.clone(),
1032 flow_type: doc.flow_type.clone(),
1033 profile: meta.name.clone(),
1034 version: meta.version.to_string(),
1035 description: doc
1036 .description
1037 .clone()
1038 .or(doc.title.clone())
1039 .or_else(|| Some(entry.kind.clone())),
1040 };
1041 Ok((descriptor, ir))
1042}
1043
1044fn load_flow_doc(
1045 archive: &mut ZipArchive<File>,
1046 entry: &greentic_pack::builder::FlowEntry,
1047) -> Result<greentic_flow::model::FlowDoc> {
1048 if let Ok(bytes) = read_entry(archive, &entry.file_yaml)
1049 && let Ok(doc) = serde_yaml::from_slice(&bytes)
1050 {
1051 return Ok(doc);
1052 }
1053 let json_bytes = read_entry(archive, &entry.file_json)
1054 .with_context(|| format!("missing flow document {}", entry.file_json))?;
1055 let doc = serde_json::from_slice(&json_bytes)
1056 .with_context(|| format!("failed to parse {}", entry.file_json))?;
1057 Ok(doc)
1058}
1059
1060fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1061 let mut file = archive
1062 .by_name(name)
1063 .with_context(|| format!("entry {name} missing from archive"))?;
1064 let mut buf = Vec::new();
1065 file.read_to_end(&mut buf)?;
1066 Ok(buf)
1067}
1068
1069#[derive(Clone, Debug, Default, Serialize, Deserialize)]
1070pub struct PackMetadata {
1071 pub pack_id: String,
1072 pub version: String,
1073 #[serde(default)]
1074 pub entry_flows: Vec<String>,
1075}
1076
1077impl PackMetadata {
1078 fn from_wasm(bytes: &[u8]) -> Option<Self> {
1079 let parser = Parser::new(0);
1080 for payload in parser.parse_all(bytes) {
1081 let payload = payload.ok()?;
1082 match payload {
1083 Payload::CustomSection(section) => {
1084 if section.name() == "greentic.manifest"
1085 && let Ok(meta) = Self::from_bytes(section.data())
1086 {
1087 return Some(meta);
1088 }
1089 }
1090 Payload::DataSection(reader) => {
1091 for segment in reader.into_iter().flatten() {
1092 if let Ok(meta) = Self::from_bytes(segment.data) {
1093 return Some(meta);
1094 }
1095 }
1096 }
1097 _ => {}
1098 }
1099 }
1100 None
1101 }
1102
1103 fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
1104 #[derive(Deserialize)]
1105 struct RawManifest {
1106 pack_id: String,
1107 version: String,
1108 #[serde(default)]
1109 entry_flows: Vec<String>,
1110 #[serde(default)]
1111 flows: Vec<RawFlow>,
1112 }
1113
1114 #[derive(Deserialize)]
1115 struct RawFlow {
1116 id: String,
1117 }
1118
1119 let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
1120 let mut entry_flows = if manifest.entry_flows.is_empty() {
1121 manifest.flows.iter().map(|f| f.id.clone()).collect()
1122 } else {
1123 manifest.entry_flows.clone()
1124 };
1125 entry_flows.retain(|id| !id.is_empty());
1126 Ok(Self {
1127 pack_id: manifest.pack_id,
1128 version: manifest.version,
1129 entry_flows,
1130 })
1131 }
1132
1133 pub fn fallback(path: &Path) -> Self {
1134 let pack_id = path
1135 .file_stem()
1136 .map(|s| s.to_string_lossy().into_owned())
1137 .unwrap_or_else(|| "unknown-pack".to_string());
1138 Self {
1139 pack_id,
1140 version: "0.0.0".to_string(),
1141 entry_flows: Vec::new(),
1142 }
1143 }
1144
1145 pub fn from_manifest(manifest: &greentic_pack::builder::PackManifest) -> Self {
1146 let entry_flows = if manifest.meta.entry_flows.is_empty() {
1147 manifest
1148 .flows
1149 .iter()
1150 .map(|flow| flow.id.clone())
1151 .collect::<Vec<_>>()
1152 } else {
1153 manifest.meta.entry_flows.clone()
1154 };
1155 Self {
1156 pack_id: manifest.meta.pack_id.clone(),
1157 version: manifest.meta.version.to_string(),
1158 entry_flows,
1159 }
1160 }
1161}
1162
1163fn build_stub_flow_ir(flow_id: &str, flow_type: &str) -> FlowIR {
1164 let mut nodes = IndexMap::new();
1165 nodes.insert(
1166 "complete".into(),
1167 NodeIR {
1168 component: "qa.process".into(),
1169 payload_expr: serde_json::json!({
1170 "status": "done",
1171 "flow_id": flow_id,
1172 }),
1173 routes: vec![RouteIR {
1174 to: None,
1175 out: true,
1176 }],
1177 },
1178 );
1179 FlowIR {
1180 id: flow_id.to_string(),
1181 flow_type: flow_type.to_string(),
1182 start: Some("complete".into()),
1183 parameters: Value::Object(Default::default()),
1184 nodes,
1185 }
1186}
1187
1188impl From<&HttpMockResponse> for LegacyHttpResponse {
1189 fn from(value: &HttpMockResponse) -> Self {
1190 let headers_json = serde_json::to_string(&value.headers).ok();
1191 Self {
1192 status: value.status,
1193 headers_json,
1194 body: value.body.clone(),
1195 }
1196 }
1197}