1use std::collections::{BTreeMap, HashMap};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7
8use crate::component_api::component::greentic::component::control::Host as ComponentControlHost;
9use crate::component_api::{
10 ComponentPre, control, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
11};
12use crate::imports::register_all;
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::runtime_wasmtime::{Component, Engine, Linker, ResourceTable, WasmResult};
15use anyhow::{Context, Result, anyhow, bail};
16use greentic_flow::ir::{FlowIR, NodeIR, RouteIR};
17use greentic_interfaces_host::host_import::v0_2 as host_import_v0_2;
18use greentic_interfaces_host::host_import::v0_2::greentic::host_import::imports::{
19 HttpRequest as LegacyHttpRequest, HttpResponse as LegacyHttpResponse,
20 IfaceError as LegacyIfaceError, TenantCtx as LegacyTenantCtx,
21};
22use greentic_interfaces_host::host_import::v0_6::{
23 self as host_import_v0_6, iface_types, state, types,
24};
25use greentic_pack::reader::{SigningPolicy, open_pack};
26use greentic_session::SessionKey as StoreSessionKey;
27use greentic_types::{
28 EnvId, FlowId, SessionCursor as StoreSessionCursor, SessionData, StateKey as StoreStateKey,
29 TeamId, TenantCtx as TypesTenantCtx, TenantId, UserId,
30};
31use indexmap::IndexMap;
32use once_cell::sync::Lazy;
33use parking_lot::Mutex;
34use reqwest::blocking::Client as BlockingClient;
35use serde::{Deserialize, Serialize};
36use serde_cbor;
37use serde_json::{self, Value};
38use serde_yaml_bw as serde_yaml;
39use tokio::fs;
40use wasmparser::{Parser, Payload};
41use wasmtime::StoreContextMut;
42use zip::ZipArchive;
43
44use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
45
46use crate::config::HostConfig;
47use crate::secrets::{DynSecretsManager, read_secret_blocking};
48use crate::storage::state::STATE_PREFIX;
49use crate::storage::{DynSessionStore, DynStateStore};
50use crate::verify;
51use crate::wasi::RunnerWasiPolicy;
52use tracing::warn;
53use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
54
55#[allow(dead_code)]
56pub struct PackRuntime {
57 path: PathBuf,
58 config: Arc<HostConfig>,
59 engine: Engine,
60 metadata: PackMetadata,
61 manifest: Option<greentic_pack::builder::PackManifest>,
62 mocks: Option<Arc<MockLayer>>,
63 flows: Option<PackFlows>,
64 components: HashMap<String, PackComponent>,
65 http_client: Arc<BlockingClient>,
66 pre_cache: Mutex<HashMap<String, ComponentPre<ComponentState>>>,
67 session_store: Option<DynSessionStore>,
68 state_store: Option<DynStateStore>,
69 wasi_policy: Arc<RunnerWasiPolicy>,
70 secrets: DynSecretsManager,
71 oauth_config: Option<OAuthBrokerConfig>,
72}
73
74struct PackComponent {
75 #[allow(dead_code)]
76 name: String,
77 #[allow(dead_code)]
78 version: String,
79 component: Component,
80}
81
82fn build_blocking_client() -> BlockingClient {
83 std::thread::spawn(|| BlockingClient::builder().build().expect("blocking client"))
84 .join()
85 .expect("client build thread panicked")
86}
87
88static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct FlowDescriptor {
92 pub id: String,
93 #[serde(rename = "type")]
94 pub flow_type: String,
95 pub profile: String,
96 pub version: String,
97 #[serde(default)]
98 pub description: Option<String>,
99}
100
101pub struct HostState {
102 config: Arc<HostConfig>,
103 http_client: Arc<BlockingClient>,
104 default_env: String,
105 session_store: Option<DynSessionStore>,
106 state_store: Option<DynStateStore>,
107 mocks: Option<Arc<MockLayer>>,
108 secrets: DynSecretsManager,
109 oauth_config: Option<OAuthBrokerConfig>,
110 oauth_host: OAuthBrokerHost,
111}
112
113impl HostState {
114 #[allow(clippy::default_constructed_unit_structs)]
115 pub fn new(
116 config: Arc<HostConfig>,
117 http_client: Arc<BlockingClient>,
118 mocks: Option<Arc<MockLayer>>,
119 session_store: Option<DynSessionStore>,
120 state_store: Option<DynStateStore>,
121 secrets: DynSecretsManager,
122 oauth_config: Option<OAuthBrokerConfig>,
123 ) -> Result<Self> {
124 let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
125 Ok(Self {
126 config,
127 http_client,
128 default_env,
129 session_store,
130 state_store,
131 mocks,
132 secrets,
133 oauth_config,
134 oauth_host: OAuthBrokerHost::default(),
135 })
136 }
137
138 pub fn get_secret(&self, key: &str) -> Result<String> {
139 if !self.config.secrets_policy.is_allowed(key) {
140 bail!("secret {key} is not permitted by bindings policy");
141 }
142 if let Some(mock) = &self.mocks
143 && let Some(value) = mock.secrets_lookup(key)
144 {
145 return Ok(value);
146 }
147 let bytes = read_secret_blocking(&self.secrets, key)
148 .context("failed to read secret from manager")?;
149 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
150 Ok(value)
151 }
152
153 fn tenant_ctx_from_v6(&self, ctx: Option<types::TenantCtx>) -> Result<TypesTenantCtx> {
154 let tenant_raw = ctx
155 .as_ref()
156 .map(|ctx| ctx.tenant.clone())
157 .unwrap_or_else(|| self.config.tenant.clone());
158 let tenant_id = TenantId::from_str(&tenant_raw)
159 .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
160 let env_id = EnvId::from_str(&self.default_env)
161 .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
162 let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
163 if let Some(ctx) = ctx {
164 if let Some(team) = ctx.team {
165 let team_id =
166 TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
167 tenant_ctx = tenant_ctx.with_team(Some(team_id));
168 }
169 if let Some(user) = ctx.user {
170 let user_id =
171 UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
172 tenant_ctx = tenant_ctx.with_user(Some(user_id));
173 }
174 if let Some(flow) = ctx.flow_id {
175 tenant_ctx = tenant_ctx.with_flow(flow);
176 }
177 if let Some(node) = ctx.node_id {
178 tenant_ctx = tenant_ctx.with_node(node);
179 }
180 if let Some(provider) = ctx.provider_id {
181 tenant_ctx = tenant_ctx.with_provider(provider);
182 }
183 if let Some(session) = ctx.session_id {
184 tenant_ctx = tenant_ctx.with_session(session);
185 }
186 tenant_ctx.trace_id = ctx.trace_id;
187 }
188 Ok(tenant_ctx)
189 }
190
191 fn session_store_handle(&self) -> Result<DynSessionStore, types::IfaceError> {
192 self.session_store
193 .as_ref()
194 .cloned()
195 .ok_or(types::IfaceError::Unavailable)
196 }
197
198 fn state_store_handle(&self) -> Result<DynStateStore, types::IfaceError> {
199 self.state_store
200 .as_ref()
201 .cloned()
202 .ok_or(types::IfaceError::Unavailable)
203 }
204
205 fn ensure_user(ctx: &TypesTenantCtx) -> Result<UserId, types::IfaceError> {
206 ctx.user_id
207 .clone()
208 .or_else(|| ctx.user.clone())
209 .ok_or(types::IfaceError::InvalidArg)
210 }
211
212 fn ensure_flow(ctx: &TypesTenantCtx) -> Result<FlowId, types::IfaceError> {
213 let flow = ctx.flow_id().ok_or(types::IfaceError::InvalidArg)?;
214 FlowId::from_str(flow).map_err(|_| types::IfaceError::InvalidArg)
215 }
216
217 fn cursor_from_iface(cursor: iface_types::SessionCursor) -> StoreSessionCursor {
218 let mut store_cursor = StoreSessionCursor::new(cursor.node_pointer);
219 if let Some(reason) = cursor.wait_reason {
220 store_cursor = store_cursor.with_wait_reason(reason);
221 }
222 if let Some(marker) = cursor.outbox_marker {
223 store_cursor = store_cursor.with_outbox_marker(marker);
224 }
225 store_cursor
226 }
227}
228
229pub struct ComponentState {
230 host: HostState,
231 wasi_ctx: WasiCtx,
232 resource_table: ResourceTable,
233}
234
235impl ComponentState {
236 pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
237 let wasi_ctx = policy
238 .instantiate()
239 .context("failed to build WASI context")?;
240 Ok(Self {
241 host,
242 wasi_ctx,
243 resource_table: ResourceTable::new(),
244 })
245 }
246
247 fn host_mut(&mut self) -> &mut HostState {
248 &mut self.host
249 }
250}
251
252impl control::Host for ComponentState {
253 fn should_cancel(&mut self) -> bool {
254 false
255 }
256
257 fn yield_now(&mut self) {
258 }
260}
261
262fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
263 let mut inst = linker.instance("greentic:component/control@0.4.0")?;
264 inst.func_wrap(
265 "should-cancel",
266 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
267 let host = caller.data_mut();
268 Ok((ComponentControlHost::should_cancel(host),))
269 },
270 )?;
271 inst.func_wrap(
272 "yield-now",
273 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
274 let host = caller.data_mut();
275 ComponentControlHost::yield_now(host);
276 Ok(())
277 },
278 )?;
279 Ok(())
280}
281
282impl OAuthHostContext for ComponentState {
283 fn tenant_id(&self) -> &str {
284 &self.host.config.tenant
285 }
286
287 fn env(&self) -> &str {
288 &self.host.default_env
289 }
290
291 fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
292 &mut self.host.oauth_host
293 }
294
295 fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
296 self.host.oauth_config.as_ref()
297 }
298}
299
300impl WasiView for ComponentState {
301 fn ctx(&mut self) -> WasiCtxView<'_> {
302 WasiCtxView {
303 ctx: &mut self.wasi_ctx,
304 table: &mut self.resource_table,
305 }
306 }
307}
308
309#[allow(unsafe_code)]
310unsafe impl Send for ComponentState {}
311#[allow(unsafe_code)]
312unsafe impl Sync for ComponentState {}
313
314impl host_import_v0_6::HostImports for ComponentState {
315 fn secrets_get(
316 &mut self,
317 key: String,
318 ctx: Option<types::TenantCtx>,
319 ) -> WasmResult<Result<String, types::IfaceError>> {
320 host_import_v0_6::HostImports::secrets_get(self.host_mut(), key, ctx)
321 }
322
323 fn telemetry_emit(
324 &mut self,
325 span_json: String,
326 ctx: Option<types::TenantCtx>,
327 ) -> WasmResult<()> {
328 host_import_v0_6::HostImports::telemetry_emit(self.host_mut(), span_json, ctx)
329 }
330
331 fn http_fetch(
332 &mut self,
333 req: host_import_v0_6::http::HttpRequest,
334 ctx: Option<types::TenantCtx>,
335 ) -> WasmResult<Result<host_import_v0_6::http::HttpResponse, types::IfaceError>> {
336 host_import_v0_6::HostImports::http_fetch(self.host_mut(), req, ctx)
337 }
338
339 fn mcp_exec(
340 &mut self,
341 component: String,
342 action: String,
343 args_json: String,
344 ctx: Option<types::TenantCtx>,
345 ) -> WasmResult<Result<String, types::IfaceError>> {
346 host_import_v0_6::HostImports::mcp_exec(self.host_mut(), component, action, args_json, ctx)
347 }
348
349 fn state_get(
350 &mut self,
351 key: iface_types::StateKey,
352 ctx: Option<types::TenantCtx>,
353 ) -> WasmResult<Result<String, types::IfaceError>> {
354 host_import_v0_6::HostImports::state_get(self.host_mut(), key, ctx)
355 }
356
357 fn state_set(
358 &mut self,
359 key: iface_types::StateKey,
360 value_json: String,
361 ctx: Option<types::TenantCtx>,
362 ) -> WasmResult<Result<state::OpAck, types::IfaceError>> {
363 host_import_v0_6::HostImports::state_set(self.host_mut(), key, value_json, ctx)
364 }
365
366 fn session_update(
367 &mut self,
368 cursor: iface_types::SessionCursor,
369 ctx: Option<types::TenantCtx>,
370 ) -> WasmResult<Result<String, types::IfaceError>> {
371 host_import_v0_6::HostImports::session_update(self.host_mut(), cursor, ctx)
372 }
373}
374
375impl host_import_v0_2::HostImports for ComponentState {
376 fn secrets_get(
377 &mut self,
378 key: String,
379 ctx: Option<LegacyTenantCtx>,
380 ) -> WasmResult<Result<String, LegacyIfaceError>> {
381 host_import_v0_2::HostImports::secrets_get(self.host_mut(), key, ctx)
382 }
383
384 fn telemetry_emit(
385 &mut self,
386 span_json: String,
387 ctx: Option<LegacyTenantCtx>,
388 ) -> WasmResult<()> {
389 host_import_v0_2::HostImports::telemetry_emit(self.host_mut(), span_json, ctx)
390 }
391
392 fn tool_invoke(
393 &mut self,
394 tool: String,
395 action: String,
396 args_json: String,
397 ctx: Option<LegacyTenantCtx>,
398 ) -> WasmResult<Result<String, LegacyIfaceError>> {
399 host_import_v0_2::HostImports::tool_invoke(self.host_mut(), tool, action, args_json, ctx)
400 }
401
402 fn http_fetch(
403 &mut self,
404 req: host_import_v0_2::greentic::host_import::imports::HttpRequest,
405 ctx: Option<host_import_v0_2::greentic::host_import::imports::TenantCtx>,
406 ) -> WasmResult<
407 Result<
408 host_import_v0_2::greentic::host_import::imports::HttpResponse,
409 host_import_v0_2::greentic::host_import::imports::IfaceError,
410 >,
411 > {
412 host_import_v0_2::HostImports::http_fetch(self.host_mut(), req, ctx)
413 }
414}
415
416impl host_import_v0_6::HostImports for HostState {
417 fn secrets_get(
418 &mut self,
419 key: String,
420 _ctx: Option<types::TenantCtx>,
421 ) -> WasmResult<Result<String, types::IfaceError>> {
422 Ok(self.get_secret(&key).map_err(|err| {
423 tracing::warn!(secret = %key, error = %err, "secret lookup denied");
424 types::IfaceError::Denied
425 }))
426 }
427
428 fn telemetry_emit(
429 &mut self,
430 span_json: String,
431 _ctx: Option<types::TenantCtx>,
432 ) -> WasmResult<()> {
433 if let Some(mock) = &self.mocks
434 && mock.telemetry_drain(&[("span_json", span_json.as_str())])
435 {
436 return Ok(());
437 }
438 tracing::info!(span = %span_json, "telemetry emit from pack");
439 Ok(())
440 }
441
442 fn http_fetch(
443 &mut self,
444 req: host_import_v0_6::http::HttpRequest,
445 _ctx: Option<types::TenantCtx>,
446 ) -> WasmResult<Result<host_import_v0_6::http::HttpResponse, types::IfaceError>> {
447 let legacy_req = LegacyHttpRequest {
448 method: req.method,
449 url: req.url,
450 headers_json: req.headers_json,
451 body: req.body,
452 };
453 match host_import_v0_2::HostImports::http_fetch(self, legacy_req, None)? {
454 Ok(resp) => Ok(Ok(host_import_v0_6::http::HttpResponse {
455 status: resp.status,
456 headers_json: resp.headers_json,
457 body: resp.body,
458 })),
459 Err(err) => Ok(Err(map_legacy_error(err))),
460 }
461 }
462
463 fn mcp_exec(
464 &mut self,
465 component: String,
466 action: String,
467 args_json: String,
468 ctx: Option<types::TenantCtx>,
469 ) -> WasmResult<Result<String, types::IfaceError>> {
470 let _ = (component, action, args_json, ctx);
471 tracing::warn!("mcp.exec requested but MCP bridge is removed; returning unavailable");
472 Ok(Err(types::IfaceError::Unavailable))
473 }
474
475 fn state_get(
476 &mut self,
477 key: iface_types::StateKey,
478 ctx: Option<types::TenantCtx>,
479 ) -> WasmResult<Result<String, types::IfaceError>> {
480 let store = match self.state_store_handle() {
481 Ok(store) => store,
482 Err(err) => return Ok(Err(err)),
483 };
484 let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
485 Ok(ctx) => ctx,
486 Err(err) => {
487 tracing::warn!(error = %err, "invalid tenant context for state.get");
488 return Ok(Err(types::IfaceError::InvalidArg));
489 }
490 };
491 let key = StoreStateKey::from(key);
492 match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
493 Ok(Some(value)) => {
494 let result = if let Some(text) = value.as_str() {
495 text.to_string()
496 } else {
497 serde_json::to_string(&value).unwrap_or_else(|_| value.to_string())
498 };
499 Ok(Ok(result))
500 }
501 Ok(None) => Ok(Err(types::IfaceError::NotFound)),
502 Err(err) => {
503 tracing::warn!(error = %err, "state.get failed");
504 Ok(Err(types::IfaceError::Internal))
505 }
506 }
507 }
508
509 fn state_set(
510 &mut self,
511 key: iface_types::StateKey,
512 value_json: String,
513 ctx: Option<types::TenantCtx>,
514 ) -> WasmResult<Result<state::OpAck, types::IfaceError>> {
515 let store = match self.state_store_handle() {
516 Ok(store) => store,
517 Err(err) => return Ok(Err(err)),
518 };
519 let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
520 Ok(ctx) => ctx,
521 Err(err) => {
522 tracing::warn!(error = %err, "invalid tenant context for state.set");
523 return Ok(Err(types::IfaceError::InvalidArg));
524 }
525 };
526 let key = StoreStateKey::from(key);
527 let value = serde_json::from_str(&value_json).unwrap_or(Value::String(value_json));
528 match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
529 Ok(()) => Ok(Ok(state::OpAck::Ok)),
530 Err(err) => {
531 tracing::warn!(error = %err, "state.set failed");
532 Ok(Err(types::IfaceError::Internal))
533 }
534 }
535 }
536
537 fn session_update(
538 &mut self,
539 cursor: iface_types::SessionCursor,
540 ctx: Option<types::TenantCtx>,
541 ) -> WasmResult<Result<String, types::IfaceError>> {
542 let store = match self.session_store_handle() {
543 Ok(store) => store,
544 Err(err) => return Ok(Err(err)),
545 };
546 let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
547 Ok(ctx) => ctx,
548 Err(err) => {
549 tracing::warn!(error = %err, "invalid tenant context for session.update");
550 return Ok(Err(types::IfaceError::InvalidArg));
551 }
552 };
553 let user = match Self::ensure_user(&tenant_ctx) {
554 Ok(user) => user,
555 Err(err) => return Ok(Err(err)),
556 };
557 let flow_id = match Self::ensure_flow(&tenant_ctx) {
558 Ok(flow) => flow,
559 Err(err) => return Ok(Err(err)),
560 };
561 let cursor = Self::cursor_from_iface(cursor);
562 let payload = SessionData {
563 tenant_ctx: tenant_ctx.clone(),
564 flow_id,
565 cursor: cursor.clone(),
566 context_json: serde_json::json!({
567 "node_pointer": cursor.node_pointer,
568 "wait_reason": cursor.wait_reason,
569 "outbox_marker": cursor.outbox_marker,
570 })
571 .to_string(),
572 };
573 if let Some(existing) = tenant_ctx.session_id() {
574 let key = StoreSessionKey::from(existing.to_string());
575 if let Err(err) = store.update_session(&key, payload) {
576 tracing::error!(error = %err, "failed to update session snapshot");
577 return Ok(Err(types::IfaceError::Internal));
578 }
579 return Ok(Ok(existing.to_string()));
580 }
581 match store.find_by_user(&tenant_ctx, &user) {
582 Ok(Some((key, _))) => {
583 if let Err(err) = store.update_session(&key, payload) {
584 tracing::error!(error = %err, "failed to update existing user session");
585 return Ok(Err(types::IfaceError::Internal));
586 }
587 return Ok(Ok(key.to_string()));
588 }
589 Ok(None) => {}
590 Err(err) => {
591 tracing::error!(error = %err, "session lookup failed");
592 return Ok(Err(types::IfaceError::Internal));
593 }
594 }
595 let key = match store.create_session(&tenant_ctx, payload.clone()) {
596 Ok(key) => key,
597 Err(err) => {
598 tracing::error!(error = %err, "failed to create session");
599 return Ok(Err(types::IfaceError::Internal));
600 }
601 };
602 let ctx_with_session = tenant_ctx.with_session(key.to_string());
603 let updated_payload = SessionData {
604 tenant_ctx: ctx_with_session.clone(),
605 ..payload
606 };
607 if let Err(err) = store.update_session(&key, updated_payload) {
608 tracing::warn!(error = %err, "failed to stamp session id after create");
609 }
610 Ok(Ok(key.to_string()))
611 }
612}
613
614impl host_import_v0_2::HostImports for HostState {
615 fn secrets_get(
616 &mut self,
617 key: String,
618 _ctx: Option<LegacyTenantCtx>,
619 ) -> WasmResult<Result<String, LegacyIfaceError>> {
620 Ok(self.get_secret(&key).map_err(|err| {
621 tracing::warn!(secret = %key, error = %err, "secret lookup denied");
622 LegacyIfaceError::Denied
623 }))
624 }
625
626 fn telemetry_emit(
627 &mut self,
628 span_json: String,
629 _ctx: Option<LegacyTenantCtx>,
630 ) -> WasmResult<()> {
631 if let Some(mock) = &self.mocks
632 && mock.telemetry_drain(&[("span_json", span_json.as_str())])
633 {
634 return Ok(());
635 }
636 tracing::info!(span = %span_json, "telemetry emit from pack");
637 Ok(())
638 }
639
640 fn tool_invoke(
641 &mut self,
642 tool: String,
643 action: String,
644 args_json: String,
645 ctx: Option<LegacyTenantCtx>,
646 ) -> WasmResult<Result<String, LegacyIfaceError>> {
647 let _ = (tool, action, args_json, ctx);
648 tracing::warn!("tool invoke requested but MCP bridge is removed; returning unavailable");
649 Ok(Err(LegacyIfaceError::Unavailable))
650 }
651
652 fn http_fetch(
653 &mut self,
654 req: LegacyHttpRequest,
655 _ctx: Option<LegacyTenantCtx>,
656 ) -> WasmResult<Result<LegacyHttpResponse, LegacyIfaceError>> {
657 if !self.config.http_enabled {
658 tracing::warn!(url = %req.url, "http fetch denied by policy");
659 return Ok(Err(LegacyIfaceError::Denied));
660 }
661
662 let mut mock_state = None;
663 let raw_body = req.body.clone();
664 if let Some(mock) = &self.mocks
665 && let Ok(meta) = HttpMockRequest::new(
666 &req.method,
667 &req.url,
668 raw_body.as_deref().map(|body| body.as_bytes()),
669 )
670 {
671 match mock.http_begin(&meta) {
672 HttpDecision::Mock(response) => {
673 let http = LegacyHttpResponse::from(&response);
674 return Ok(Ok(http));
675 }
676 HttpDecision::Deny(reason) => {
677 tracing::warn!(url = %req.url, reason = %reason, "http fetch blocked by mocks");
678 return Ok(Err(LegacyIfaceError::Denied));
679 }
680 HttpDecision::Passthrough { record } => {
681 mock_state = Some((meta, record));
682 }
683 }
684 }
685
686 let method = req.method.parse().unwrap_or(reqwest::Method::GET);
687 let mut builder = self.http_client.request(method, &req.url);
688
689 if let Some(headers_json) = req.headers_json.as_ref() {
690 match serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(headers_json) {
691 Ok(map) => {
692 for (key, value) in map {
693 if let Some(val) = value.as_str()
694 && let Ok(header) =
695 reqwest::header::HeaderName::from_bytes(key.as_bytes())
696 && let Ok(header_value) = reqwest::header::HeaderValue::from_str(val)
697 {
698 builder = builder.header(header, header_value);
699 }
700 }
701 }
702 Err(err) => {
703 tracing::warn!(error = %err, "failed to parse headers for http.fetch");
704 }
705 }
706 }
707
708 if let Some(body) = raw_body.clone() {
709 builder = builder.body(body);
710 }
711
712 let response = match builder.send() {
713 Ok(resp) => resp,
714 Err(err) => {
715 tracing::error!(url = %req.url, error = %err, "http fetch failed");
716 return Ok(Err(LegacyIfaceError::Unavailable));
717 }
718 };
719
720 let status = response.status().as_u16();
721 let headers_map = response
722 .headers()
723 .iter()
724 .map(|(k, v)| {
725 (
726 k.as_str().to_string(),
727 v.to_str().unwrap_or_default().to_string(),
728 )
729 })
730 .collect::<BTreeMap<_, _>>();
731 let headers_json = serde_json::to_string(&headers_map).ok();
732 let body = response.text().ok();
733
734 if let Some((meta, true)) = mock_state.take()
735 && let Some(mock) = &self.mocks
736 {
737 let recorded = HttpMockResponse::new(status, headers_map.clone(), body.clone());
738 mock.http_record(&meta, &recorded);
739 }
740
741 Ok(Ok(LegacyHttpResponse {
742 status,
743 headers_json,
744 body,
745 }))
746 }
747}
748
749impl PackRuntime {
750 #[allow(clippy::too_many_arguments)]
751 pub async fn load(
752 path: impl AsRef<Path>,
753 config: Arc<HostConfig>,
754 mocks: Option<Arc<MockLayer>>,
755 archive_source: Option<&Path>,
756 session_store: Option<DynSessionStore>,
757 state_store: Option<DynStateStore>,
758 wasi_policy: Arc<RunnerWasiPolicy>,
759 secrets: DynSecretsManager,
760 oauth_config: Option<OAuthBrokerConfig>,
761 verify_archive: bool,
762 ) -> Result<Self> {
763 let path = path.as_ref();
764 let is_component = path
765 .extension()
766 .and_then(|ext| ext.to_str())
767 .map(|ext| ext.eq_ignore_ascii_case("wasm"))
768 .unwrap_or(false);
769 let archive_hint = archive_source.or(if is_component { None } else { Some(path) });
770 if verify_archive {
771 let verify_target = archive_hint.unwrap_or(path);
772 verify::verify_pack(verify_target).await?;
773 tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
774 }
775 let engine = Engine::default();
776 let wasm_bytes = fs::read(path).await?;
777 let mut metadata =
778 PackMetadata::from_wasm(&wasm_bytes).unwrap_or_else(|| PackMetadata::fallback(path));
779 let mut manifest = None;
780 let flows = if let Some(archive_path) = archive_hint {
781 match open_pack(archive_path, SigningPolicy::DevOk) {
782 Ok(pack) => {
783 metadata = PackMetadata::from_manifest(&pack.manifest);
784 manifest = Some(pack.manifest);
785 None
786 }
787 Err(err) => {
788 warn!(error = ?err, pack = %archive_path.display(), "failed to parse pack manifest; falling back to legacy flow cache");
789 match PackFlows::from_archive(archive_path) {
790 Ok(cache) => {
791 metadata = cache.metadata.clone();
792 Some(cache)
793 }
794 Err(err) => {
795 warn!(
796 error = %err,
797 pack = %archive_path.display(),
798 "failed to parse pack flows via greentic-pack; falling back to component exports"
799 );
800 None
801 }
802 }
803 }
804 }
805 } else {
806 None
807 };
808 let components = if let Some(archive_path) = archive_hint {
809 match load_components_from_archive(&engine, archive_path) {
810 Ok(map) => map,
811 Err(err) => {
812 warn!(error = %err, pack = %archive_path.display(), "failed to load components from archive");
813 HashMap::new()
814 }
815 }
816 } else if is_component {
817 let name = path
818 .file_stem()
819 .map(|s| s.to_string_lossy().to_string())
820 .unwrap_or_else(|| "component".to_string());
821 let component = Component::from_binary(&engine, &wasm_bytes)?;
822 let mut map = HashMap::new();
823 map.insert(
824 name.clone(),
825 PackComponent {
826 name,
827 version: metadata.version.clone(),
828 component,
829 },
830 );
831 map
832 } else {
833 HashMap::new()
834 };
835 let http_client = Arc::clone(&HTTP_CLIENT);
836 Ok(Self {
837 path: path.to_path_buf(),
838 config,
839 engine,
840 metadata,
841 manifest,
842 mocks,
843 flows,
844 components,
845 http_client,
846 pre_cache: Mutex::new(HashMap::new()),
847 session_store,
848 state_store,
849 wasi_policy,
850 secrets,
851 oauth_config,
852 })
853 }
854
855 pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
856 if let Some(cache) = &self.flows {
857 return Ok(cache.descriptors.clone());
858 }
859 if let Some(manifest) = &self.manifest {
860 let descriptors = manifest
861 .flows
862 .iter()
863 .map(|flow| FlowDescriptor {
864 id: flow.id.clone(),
865 flow_type: flow.kind.clone(),
866 profile: manifest.meta.name.clone(),
867 version: manifest.meta.version.to_string(),
868 description: Some(flow.kind.clone()),
869 })
870 .collect();
871 return Ok(descriptors);
872 }
873 Ok(Vec::new())
874 }
875
876 #[allow(dead_code)]
877 pub async fn run_flow(
878 &self,
879 _flow_id: &str,
880 _input: serde_json::Value,
881 ) -> Result<serde_json::Value> {
882 Ok(serde_json::json!({}))
884 }
885
886 pub async fn invoke_component(
887 &self,
888 component_ref: &str,
889 ctx: ComponentExecCtx,
890 operation: &str,
891 _config_json: Option<String>,
892 input_json: String,
893 ) -> Result<Value> {
894 let pack_component = self
895 .components
896 .get(component_ref)
897 .with_context(|| format!("component '{component_ref}' not found in pack"))?;
898
899 let pre = if let Some(pre) = self.pre_cache.lock().get(component_ref).cloned() {
900 pre
901 } else {
902 let mut linker = Linker::new(&self.engine);
903 register_all(&mut linker, self.oauth_config.is_some())?;
904 add_component_control_to_linker(&mut linker)?;
905 let pre = ComponentPre::new(
906 linker
907 .instantiate_pre(&pack_component.component)
908 .map_err(|err| anyhow!(err))?,
909 )
910 .map_err(|err| anyhow!(err))?;
911 self.pre_cache
912 .lock()
913 .insert(component_ref.to_string(), pre.clone());
914 pre
915 };
916
917 let host_state = HostState::new(
918 Arc::clone(&self.config),
919 Arc::clone(&self.http_client),
920 self.mocks.clone(),
921 self.session_store.clone(),
922 self.state_store.clone(),
923 Arc::clone(&self.secrets),
924 self.oauth_config.clone(),
925 )?;
926 let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
927 let mut store = wasmtime::Store::new(&self.engine, store_state);
928 let bindings = pre
929 .instantiate_async(&mut store)
930 .await
931 .map_err(|err| anyhow!(err))?;
932 let node = bindings.greentic_component_node();
933
934 let result = node.call_invoke(&mut store, &ctx, operation, &input_json)?;
935
936 match result {
937 InvokeResult::Ok(body) => {
938 if body.is_empty() {
939 return Ok(Value::Null);
940 }
941 serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
942 }
943 InvokeResult::Err(NodeError {
944 code,
945 message,
946 retryable,
947 backoff_ms,
948 details,
949 }) => {
950 let mut obj = serde_json::Map::new();
951 obj.insert("ok".into(), Value::Bool(false));
952 let mut error = serde_json::Map::new();
953 error.insert("code".into(), Value::String(code));
954 error.insert("message".into(), Value::String(message));
955 error.insert("retryable".into(), Value::Bool(retryable));
956 if let Some(backoff) = backoff_ms {
957 error.insert("backoff_ms".into(), Value::Number(backoff.into()));
958 }
959 if let Some(details) = details {
960 error.insert(
961 "details".into(),
962 serde_json::from_str(&details).unwrap_or(Value::String(details)),
963 );
964 }
965 obj.insert("error".into(), Value::Object(error));
966 Ok(Value::Object(obj))
967 }
968 }
969 }
970
971 pub fn load_flow_ir(&self, flow_id: &str) -> Result<greentic_flow::ir::FlowIR> {
972 if let Some(cache) = &self.flows {
973 return cache
974 .flows
975 .get(flow_id)
976 .cloned()
977 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
978 }
979 if let Some(manifest) = &self.manifest {
980 let entry = manifest
981 .flows
982 .iter()
983 .find(|f| f.id == flow_id)
984 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
985 let path = &self.path;
986 let mut archive = ZipArchive::new(File::open(path)?)?;
987 match load_flow_doc(&mut archive, entry).and_then(|doc| {
988 greentic_flow::to_ir(doc.clone())
989 .with_context(|| format!("failed to build IR for flow {}", doc.id))
990 }) {
991 Ok(ir) => return Ok(ir),
992 Err(err) => {
993 warn!(
994 error = %err,
995 flow_id = flow_id,
996 "failed to parse flow from archive; falling back to stub IR"
997 );
998 return Ok(build_stub_flow_ir(&entry.id, &entry.kind));
999 }
1000 }
1001 }
1002 bail!("flow '{flow_id}' not available (pack exports disabled)")
1003 }
1004
1005 pub fn metadata(&self) -> &PackMetadata {
1006 &self.metadata
1007 }
1008
1009 pub fn for_component_test(
1010 components: Vec<(String, PathBuf)>,
1011 flows: HashMap<String, FlowIR>,
1012 config: Arc<HostConfig>,
1013 ) -> Result<Self> {
1014 let engine = Engine::default();
1015 let mut component_map = HashMap::new();
1016 for (name, path) in components {
1017 if !path.exists() {
1018 bail!("component artifact missing: {}", path.display());
1019 }
1020 let wasm_bytes = std::fs::read(&path)?;
1021 let component = Component::from_binary(&engine, &wasm_bytes)
1022 .with_context(|| format!("failed to compile component {}", path.display()))?;
1023 component_map.insert(
1024 name.clone(),
1025 PackComponent {
1026 name,
1027 version: "0.0.0".into(),
1028 component,
1029 },
1030 );
1031 }
1032
1033 let descriptors = flows
1034 .iter()
1035 .map(|(id, ir)| FlowDescriptor {
1036 id: id.clone(),
1037 flow_type: ir.flow_type.clone(),
1038 profile: "test".into(),
1039 version: "0.0.0".into(),
1040 description: None,
1041 })
1042 .collect::<Vec<_>>();
1043 let flows_cache = PackFlows {
1044 descriptors: descriptors.clone(),
1045 flows,
1046 metadata: PackMetadata::fallback(Path::new("component-test")),
1047 };
1048
1049 Ok(Self {
1050 path: PathBuf::new(),
1051 config,
1052 engine,
1053 metadata: PackMetadata::fallback(Path::new("component-test")),
1054 manifest: None,
1055 mocks: None,
1056 flows: Some(flows_cache),
1057 components: component_map,
1058 http_client: Arc::clone(&HTTP_CLIENT),
1059 pre_cache: Mutex::new(HashMap::new()),
1060 session_store: None,
1061 state_store: None,
1062 wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1063 secrets: crate::secrets::default_manager(),
1064 oauth_config: None,
1065 })
1066 }
1067}
1068
1069fn map_legacy_error(err: LegacyIfaceError) -> types::IfaceError {
1070 match err {
1071 LegacyIfaceError::InvalidArg => types::IfaceError::InvalidArg,
1072 LegacyIfaceError::NotFound => types::IfaceError::NotFound,
1073 LegacyIfaceError::Denied => types::IfaceError::Denied,
1074 LegacyIfaceError::Unavailable => types::IfaceError::Unavailable,
1075 LegacyIfaceError::Internal => types::IfaceError::Internal,
1076 }
1077}
1078
1079struct PackFlows {
1080 descriptors: Vec<FlowDescriptor>,
1081 flows: HashMap<String, FlowIR>,
1082 metadata: PackMetadata,
1083}
1084
1085impl PackFlows {
1086 fn from_archive(path: &Path) -> Result<Self> {
1087 let pack = open_pack(path, SigningPolicy::DevOk)
1088 .map_err(|err| anyhow!("failed to open pack {}: {}", path.display(), err.message))?;
1089 let file = File::open(path)
1090 .with_context(|| format!("failed to open archive {}", path.display()))?;
1091 let mut archive = ZipArchive::new(file)
1092 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1093 let mut flows = HashMap::new();
1094 let mut descriptors = Vec::new();
1095 for flow in &pack.manifest.flows {
1096 match load_flow_entry(&mut archive, flow, &pack.manifest.meta) {
1097 Ok((descriptor, ir)) => {
1098 flows.insert(descriptor.id.clone(), ir);
1099 descriptors.push(descriptor);
1100 }
1101 Err(err) => {
1102 warn!(
1103 error = %err,
1104 flow_id = flow.id,
1105 "failed to parse flow metadata from archive; using stub flow"
1106 );
1107 let stub = build_stub_flow_ir(&flow.id, &flow.kind);
1108 flows.insert(flow.id.clone(), stub.clone());
1109 descriptors.push(FlowDescriptor {
1110 id: flow.id.clone(),
1111 flow_type: flow.kind.clone(),
1112 profile: pack.manifest.meta.name.clone(),
1113 version: pack.manifest.meta.version.to_string(),
1114 description: Some(flow.kind.clone()),
1115 });
1116 }
1117 }
1118 }
1119
1120 Ok(Self {
1121 metadata: PackMetadata::from_manifest(&pack.manifest),
1122 descriptors,
1123 flows,
1124 })
1125 }
1126}
1127
1128fn load_flow_entry(
1129 archive: &mut ZipArchive<File>,
1130 entry: &greentic_pack::builder::FlowEntry,
1131 meta: &greentic_pack::builder::PackMeta,
1132) -> Result<(FlowDescriptor, FlowIR)> {
1133 let doc = load_flow_doc(archive, entry)?;
1134 let ir = greentic_flow::to_ir(doc.clone())
1135 .with_context(|| format!("failed to build IR for flow {}", doc.id))?;
1136 let descriptor = FlowDescriptor {
1137 id: doc.id.clone(),
1138 flow_type: doc.flow_type.clone(),
1139 profile: meta.name.clone(),
1140 version: meta.version.to_string(),
1141 description: doc
1142 .description
1143 .clone()
1144 .or(doc.title.clone())
1145 .or_else(|| Some(entry.kind.clone())),
1146 };
1147 Ok((descriptor, ir))
1148}
1149
1150fn load_flow_doc(
1151 archive: &mut ZipArchive<File>,
1152 entry: &greentic_pack::builder::FlowEntry,
1153) -> Result<greentic_flow::model::FlowDoc> {
1154 if let Ok(bytes) = read_entry(archive, &entry.file_yaml)
1155 && let Ok(doc) = serde_yaml::from_slice(&bytes)
1156 {
1157 return Ok(doc);
1158 }
1159 let json_bytes = read_entry(archive, &entry.file_json)
1160 .with_context(|| format!("missing flow document {}", entry.file_json))?;
1161 let doc = serde_json::from_slice(&json_bytes)
1162 .with_context(|| format!("failed to parse {}", entry.file_json))?;
1163 Ok(doc)
1164}
1165
1166fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1167 let mut file = archive
1168 .by_name(name)
1169 .with_context(|| format!("entry {name} missing from archive"))?;
1170 let mut buf = Vec::new();
1171 file.read_to_end(&mut buf)?;
1172 Ok(buf)
1173}
1174
1175fn load_components_from_archive(
1176 engine: &Engine,
1177 path: &Path,
1178) -> Result<HashMap<String, PackComponent>> {
1179 let pack = open_pack(path, SigningPolicy::DevOk)
1180 .map_err(|err| anyhow!("failed to open pack {}: {}", path.display(), err.message))?;
1181 let mut archive = ZipArchive::new(File::open(path)?)?;
1182 let mut components = HashMap::new();
1183 for entry in &pack.manifest.components {
1184 let bytes = read_entry(&mut archive, &entry.file_wasm)
1185 .with_context(|| format!("missing component {}", entry.file_wasm))?;
1186 let component = Component::from_binary(engine, &bytes)
1187 .with_context(|| format!("failed to compile component {}", entry.name))?;
1188 components.insert(
1189 entry.name.clone(),
1190 PackComponent {
1191 name: entry.name.clone(),
1192 version: entry.version.to_string(),
1193 component,
1194 },
1195 );
1196 }
1197 Ok(components)
1198}
1199
1200#[derive(Clone, Debug, Default, Serialize, Deserialize)]
1201pub struct PackMetadata {
1202 pub pack_id: String,
1203 pub version: String,
1204 #[serde(default)]
1205 pub entry_flows: Vec<String>,
1206}
1207
1208impl PackMetadata {
1209 fn from_wasm(bytes: &[u8]) -> Option<Self> {
1210 let parser = Parser::new(0);
1211 for payload in parser.parse_all(bytes) {
1212 let payload = payload.ok()?;
1213 match payload {
1214 Payload::CustomSection(section) => {
1215 if section.name() == "greentic.manifest"
1216 && let Ok(meta) = Self::from_bytes(section.data())
1217 {
1218 return Some(meta);
1219 }
1220 }
1221 Payload::DataSection(reader) => {
1222 for segment in reader.into_iter().flatten() {
1223 if let Ok(meta) = Self::from_bytes(segment.data) {
1224 return Some(meta);
1225 }
1226 }
1227 }
1228 _ => {}
1229 }
1230 }
1231 None
1232 }
1233
1234 fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
1235 #[derive(Deserialize)]
1236 struct RawManifest {
1237 pack_id: String,
1238 version: String,
1239 #[serde(default)]
1240 entry_flows: Vec<String>,
1241 #[serde(default)]
1242 flows: Vec<RawFlow>,
1243 }
1244
1245 #[derive(Deserialize)]
1246 struct RawFlow {
1247 id: String,
1248 }
1249
1250 let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
1251 let mut entry_flows = if manifest.entry_flows.is_empty() {
1252 manifest.flows.iter().map(|f| f.id.clone()).collect()
1253 } else {
1254 manifest.entry_flows.clone()
1255 };
1256 entry_flows.retain(|id| !id.is_empty());
1257 Ok(Self {
1258 pack_id: manifest.pack_id,
1259 version: manifest.version,
1260 entry_flows,
1261 })
1262 }
1263
1264 pub fn fallback(path: &Path) -> Self {
1265 let pack_id = path
1266 .file_stem()
1267 .map(|s| s.to_string_lossy().into_owned())
1268 .unwrap_or_else(|| "unknown-pack".to_string());
1269 Self {
1270 pack_id,
1271 version: "0.0.0".to_string(),
1272 entry_flows: Vec::new(),
1273 }
1274 }
1275
1276 pub fn from_manifest(manifest: &greentic_pack::builder::PackManifest) -> Self {
1277 let entry_flows = if manifest.meta.entry_flows.is_empty() {
1278 manifest
1279 .flows
1280 .iter()
1281 .map(|flow| flow.id.clone())
1282 .collect::<Vec<_>>()
1283 } else {
1284 manifest.meta.entry_flows.clone()
1285 };
1286 Self {
1287 pack_id: manifest.meta.pack_id.clone(),
1288 version: manifest.meta.version.to_string(),
1289 entry_flows,
1290 }
1291 }
1292}
1293
1294fn build_stub_flow_ir(flow_id: &str, flow_type: &str) -> FlowIR {
1295 let mut nodes = IndexMap::new();
1296 nodes.insert(
1297 "complete".into(),
1298 NodeIR {
1299 component: "component.exec".into(),
1300 payload_expr: serde_json::json!({
1301 "component": "qa.process",
1302 "operation": "process",
1303 "input": {
1304 "status": "done",
1305 "flow_id": flow_id,
1306 }
1307 }),
1308 routes: vec![RouteIR {
1309 to: None,
1310 out: true,
1311 }],
1312 },
1313 );
1314 FlowIR {
1315 id: flow_id.to_string(),
1316 flow_type: flow_type.to_string(),
1317 start: Some("complete".into()),
1318 parameters: Value::Object(Default::default()),
1319 nodes,
1320 }
1321}
1322
1323impl From<&HttpMockResponse> for LegacyHttpResponse {
1324 fn from(value: &HttpMockResponse) -> Self {
1325 let headers_json = serde_json::to_string(&value.headers).ok();
1326 Self {
1327 status: value.status,
1328 headers_json,
1329 body: value.body.clone(),
1330 }
1331 }
1332}