1use std::collections::{BTreeMap, HashMap};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7
8use crate::component_api::component::greentic::component::control::Host as ComponentControlHost;
9use crate::component_api::{
10 ComponentPre, control, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
11};
12use crate::imports::register_all;
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::runtime_wasmtime::{Component, Engine, Linker, ResourceTable, WasmResult};
15use anyhow::{Context, Result, anyhow, bail};
16use greentic_flow::ir::{FlowIR, NodeIR, RouteIR};
17use greentic_interfaces_host::host_import::v0_2 as host_import_v0_2;
18use greentic_interfaces_host::host_import::v0_2::greentic::host_import::imports::{
19 HttpRequest as LegacyHttpRequest, HttpResponse as LegacyHttpResponse,
20 IfaceError as LegacyIfaceError, TenantCtx as LegacyTenantCtx,
21};
22use greentic_interfaces_host::host_import::v0_6::{
23 self as host_import_v0_6, iface_types, state, types,
24};
25use greentic_pack::reader::{SigningPolicy, open_pack};
26use greentic_session::SessionKey as StoreSessionKey;
27use greentic_types::{
28 EnvId, FlowId, SessionCursor as StoreSessionCursor, SessionData, StateKey as StoreStateKey,
29 TeamId, TenantCtx as TypesTenantCtx, TenantId, UserId,
30};
31use indexmap::IndexMap;
32use once_cell::sync::Lazy;
33use parking_lot::Mutex;
34use reqwest::blocking::Client as BlockingClient;
35use runner_core::normalize_under_root;
36use serde::{Deserialize, Serialize};
37use serde_cbor;
38use serde_json::{self, Value};
39use serde_yaml_bw as serde_yaml;
40use tokio::fs;
41use wasmparser::{Parser, Payload};
42use wasmtime::StoreContextMut;
43use zip::ZipArchive;
44
45use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
46
47use crate::config::HostConfig;
48use crate::secrets::{DynSecretsManager, read_secret_blocking};
49use crate::storage::state::STATE_PREFIX;
50use crate::storage::{DynSessionStore, DynStateStore};
51use crate::verify;
52use crate::wasi::RunnerWasiPolicy;
53use tracing::warn;
54use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
55
56#[allow(dead_code)]
57pub struct PackRuntime {
58 path: PathBuf,
60 archive_path: Option<PathBuf>,
62 config: Arc<HostConfig>,
63 engine: Engine,
64 metadata: PackMetadata,
65 manifest: Option<greentic_pack::builder::PackManifest>,
66 mocks: Option<Arc<MockLayer>>,
67 flows: Option<PackFlows>,
68 components: HashMap<String, PackComponent>,
69 http_client: Arc<BlockingClient>,
70 pre_cache: Mutex<HashMap<String, ComponentPre<ComponentState>>>,
71 session_store: Option<DynSessionStore>,
72 state_store: Option<DynStateStore>,
73 wasi_policy: Arc<RunnerWasiPolicy>,
74 secrets: DynSecretsManager,
75 oauth_config: Option<OAuthBrokerConfig>,
76}
77
78struct PackComponent {
79 #[allow(dead_code)]
80 name: String,
81 #[allow(dead_code)]
82 version: String,
83 component: Component,
84}
85
86fn build_blocking_client() -> BlockingClient {
87 std::thread::spawn(|| BlockingClient::builder().build().expect("blocking client"))
88 .join()
89 .expect("client build thread panicked")
90}
91
92fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
93 let (root, candidate) = if path.is_absolute() {
94 let parent = path
95 .parent()
96 .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
97 let root = parent
98 .canonicalize()
99 .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
100 let file = path
101 .file_name()
102 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
103 (root, PathBuf::from(file))
104 } else {
105 let cwd = std::env::current_dir().context("failed to resolve current directory")?;
106 let base = if let Some(parent) = path.parent() {
107 cwd.join(parent)
108 } else {
109 cwd
110 };
111 let root = base
112 .canonicalize()
113 .with_context(|| format!("failed to canonicalize {}", base.display()))?;
114 let file = path
115 .file_name()
116 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
117 (root, PathBuf::from(file))
118 };
119 let safe = normalize_under_root(&root, &candidate)?;
120 Ok((root, safe))
121}
122
123static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct FlowDescriptor {
127 pub id: String,
128 #[serde(rename = "type")]
129 pub flow_type: String,
130 pub profile: String,
131 pub version: String,
132 #[serde(default)]
133 pub description: Option<String>,
134}
135
136pub struct HostState {
137 config: Arc<HostConfig>,
138 http_client: Arc<BlockingClient>,
139 default_env: String,
140 session_store: Option<DynSessionStore>,
141 state_store: Option<DynStateStore>,
142 mocks: Option<Arc<MockLayer>>,
143 secrets: DynSecretsManager,
144 oauth_config: Option<OAuthBrokerConfig>,
145 oauth_host: OAuthBrokerHost,
146}
147
148impl HostState {
149 #[allow(clippy::default_constructed_unit_structs)]
150 pub fn new(
151 config: Arc<HostConfig>,
152 http_client: Arc<BlockingClient>,
153 mocks: Option<Arc<MockLayer>>,
154 session_store: Option<DynSessionStore>,
155 state_store: Option<DynStateStore>,
156 secrets: DynSecretsManager,
157 oauth_config: Option<OAuthBrokerConfig>,
158 ) -> Result<Self> {
159 let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
160 Ok(Self {
161 config,
162 http_client,
163 default_env,
164 session_store,
165 state_store,
166 mocks,
167 secrets,
168 oauth_config,
169 oauth_host: OAuthBrokerHost::default(),
170 })
171 }
172
173 pub fn get_secret(&self, key: &str) -> Result<String> {
174 if !self.config.secrets_policy.is_allowed(key) {
175 bail!("secret {key} is not permitted by bindings policy");
176 }
177 if let Some(mock) = &self.mocks
178 && let Some(value) = mock.secrets_lookup(key)
179 {
180 return Ok(value);
181 }
182 let bytes = read_secret_blocking(&self.secrets, key)
183 .context("failed to read secret from manager")?;
184 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
185 Ok(value)
186 }
187
188 fn tenant_ctx_from_v6(&self, ctx: Option<types::TenantCtx>) -> Result<TypesTenantCtx> {
189 let tenant_raw = ctx
190 .as_ref()
191 .map(|ctx| ctx.tenant.clone())
192 .unwrap_or_else(|| self.config.tenant.clone());
193 let tenant_id = TenantId::from_str(&tenant_raw)
194 .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
195 let env_id = EnvId::from_str(&self.default_env)
196 .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
197 let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
198 if let Some(ctx) = ctx {
199 if let Some(team) = ctx.team {
200 let team_id =
201 TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
202 tenant_ctx = tenant_ctx.with_team(Some(team_id));
203 }
204 if let Some(user) = ctx.user {
205 let user_id =
206 UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
207 tenant_ctx = tenant_ctx.with_user(Some(user_id));
208 }
209 if let Some(flow) = ctx.flow_id {
210 tenant_ctx = tenant_ctx.with_flow(flow);
211 }
212 if let Some(node) = ctx.node_id {
213 tenant_ctx = tenant_ctx.with_node(node);
214 }
215 if let Some(provider) = ctx.provider_id {
216 tenant_ctx = tenant_ctx.with_provider(provider);
217 }
218 if let Some(session) = ctx.session_id {
219 tenant_ctx = tenant_ctx.with_session(session);
220 }
221 tenant_ctx.trace_id = ctx.trace_id;
222 }
223 Ok(tenant_ctx)
224 }
225
226 fn session_store_handle(&self) -> Result<DynSessionStore, types::IfaceError> {
227 self.session_store
228 .as_ref()
229 .cloned()
230 .ok_or(types::IfaceError::Unavailable)
231 }
232
233 fn state_store_handle(&self) -> Result<DynStateStore, types::IfaceError> {
234 self.state_store
235 .as_ref()
236 .cloned()
237 .ok_or(types::IfaceError::Unavailable)
238 }
239
240 fn ensure_user(ctx: &TypesTenantCtx) -> Result<UserId, types::IfaceError> {
241 ctx.user_id
242 .clone()
243 .or_else(|| ctx.user.clone())
244 .ok_or(types::IfaceError::InvalidArg)
245 }
246
247 fn ensure_flow(ctx: &TypesTenantCtx) -> Result<FlowId, types::IfaceError> {
248 let flow = ctx.flow_id().ok_or(types::IfaceError::InvalidArg)?;
249 FlowId::from_str(flow).map_err(|_| types::IfaceError::InvalidArg)
250 }
251
252 fn cursor_from_iface(cursor: iface_types::SessionCursor) -> StoreSessionCursor {
253 let mut store_cursor = StoreSessionCursor::new(cursor.node_pointer);
254 if let Some(reason) = cursor.wait_reason {
255 store_cursor = store_cursor.with_wait_reason(reason);
256 }
257 if let Some(marker) = cursor.outbox_marker {
258 store_cursor = store_cursor.with_outbox_marker(marker);
259 }
260 store_cursor
261 }
262}
263
264pub struct ComponentState {
265 host: HostState,
266 wasi_ctx: WasiCtx,
267 resource_table: ResourceTable,
268}
269
270impl ComponentState {
271 pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
272 let wasi_ctx = policy
273 .instantiate()
274 .context("failed to build WASI context")?;
275 Ok(Self {
276 host,
277 wasi_ctx,
278 resource_table: ResourceTable::new(),
279 })
280 }
281
282 fn host_mut(&mut self) -> &mut HostState {
283 &mut self.host
284 }
285}
286
287impl control::Host for ComponentState {
288 fn should_cancel(&mut self) -> bool {
289 false
290 }
291
292 fn yield_now(&mut self) {
293 }
295}
296
297fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
298 let mut inst = linker.instance("greentic:component/control@0.4.0")?;
299 inst.func_wrap(
300 "should-cancel",
301 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
302 let host = caller.data_mut();
303 Ok((ComponentControlHost::should_cancel(host),))
304 },
305 )?;
306 inst.func_wrap(
307 "yield-now",
308 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
309 let host = caller.data_mut();
310 ComponentControlHost::yield_now(host);
311 Ok(())
312 },
313 )?;
314 Ok(())
315}
316
317impl OAuthHostContext for ComponentState {
318 fn tenant_id(&self) -> &str {
319 &self.host.config.tenant
320 }
321
322 fn env(&self) -> &str {
323 &self.host.default_env
324 }
325
326 fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
327 &mut self.host.oauth_host
328 }
329
330 fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
331 self.host.oauth_config.as_ref()
332 }
333}
334
335impl WasiView for ComponentState {
336 fn ctx(&mut self) -> WasiCtxView<'_> {
337 WasiCtxView {
338 ctx: &mut self.wasi_ctx,
339 table: &mut self.resource_table,
340 }
341 }
342}
343
344#[allow(unsafe_code)]
345unsafe impl Send for ComponentState {}
346#[allow(unsafe_code)]
347unsafe impl Sync for ComponentState {}
348
349impl host_import_v0_6::HostImports for ComponentState {
350 fn secrets_get(
351 &mut self,
352 key: String,
353 ctx: Option<types::TenantCtx>,
354 ) -> WasmResult<Result<String, types::IfaceError>> {
355 host_import_v0_6::HostImports::secrets_get(self.host_mut(), key, ctx)
356 }
357
358 fn telemetry_emit(
359 &mut self,
360 span_json: String,
361 ctx: Option<types::TenantCtx>,
362 ) -> WasmResult<()> {
363 host_import_v0_6::HostImports::telemetry_emit(self.host_mut(), span_json, ctx)
364 }
365
366 fn http_fetch(
367 &mut self,
368 req: host_import_v0_6::http::HttpRequest,
369 ctx: Option<types::TenantCtx>,
370 ) -> WasmResult<Result<host_import_v0_6::http::HttpResponse, types::IfaceError>> {
371 host_import_v0_6::HostImports::http_fetch(self.host_mut(), req, ctx)
372 }
373
374 fn mcp_exec(
375 &mut self,
376 component: String,
377 action: String,
378 args_json: String,
379 ctx: Option<types::TenantCtx>,
380 ) -> WasmResult<Result<String, types::IfaceError>> {
381 host_import_v0_6::HostImports::mcp_exec(self.host_mut(), component, action, args_json, ctx)
382 }
383
384 fn state_get(
385 &mut self,
386 key: iface_types::StateKey,
387 ctx: Option<types::TenantCtx>,
388 ) -> WasmResult<Result<String, types::IfaceError>> {
389 host_import_v0_6::HostImports::state_get(self.host_mut(), key, ctx)
390 }
391
392 fn state_set(
393 &mut self,
394 key: iface_types::StateKey,
395 value_json: String,
396 ctx: Option<types::TenantCtx>,
397 ) -> WasmResult<Result<state::OpAck, types::IfaceError>> {
398 host_import_v0_6::HostImports::state_set(self.host_mut(), key, value_json, ctx)
399 }
400
401 fn session_update(
402 &mut self,
403 cursor: iface_types::SessionCursor,
404 ctx: Option<types::TenantCtx>,
405 ) -> WasmResult<Result<String, types::IfaceError>> {
406 host_import_v0_6::HostImports::session_update(self.host_mut(), cursor, ctx)
407 }
408}
409
410impl host_import_v0_2::HostImports for ComponentState {
411 fn secrets_get(
412 &mut self,
413 key: String,
414 ctx: Option<LegacyTenantCtx>,
415 ) -> WasmResult<Result<String, LegacyIfaceError>> {
416 host_import_v0_2::HostImports::secrets_get(self.host_mut(), key, ctx)
417 }
418
419 fn telemetry_emit(
420 &mut self,
421 span_json: String,
422 ctx: Option<LegacyTenantCtx>,
423 ) -> WasmResult<()> {
424 host_import_v0_2::HostImports::telemetry_emit(self.host_mut(), span_json, ctx)
425 }
426
427 fn tool_invoke(
428 &mut self,
429 tool: String,
430 action: String,
431 args_json: String,
432 ctx: Option<LegacyTenantCtx>,
433 ) -> WasmResult<Result<String, LegacyIfaceError>> {
434 host_import_v0_2::HostImports::tool_invoke(self.host_mut(), tool, action, args_json, ctx)
435 }
436
437 fn http_fetch(
438 &mut self,
439 req: host_import_v0_2::greentic::host_import::imports::HttpRequest,
440 ctx: Option<host_import_v0_2::greentic::host_import::imports::TenantCtx>,
441 ) -> WasmResult<
442 Result<
443 host_import_v0_2::greentic::host_import::imports::HttpResponse,
444 host_import_v0_2::greentic::host_import::imports::IfaceError,
445 >,
446 > {
447 host_import_v0_2::HostImports::http_fetch(self.host_mut(), req, ctx)
448 }
449}
450
451impl host_import_v0_6::HostImports for HostState {
452 fn secrets_get(
453 &mut self,
454 key: String,
455 _ctx: Option<types::TenantCtx>,
456 ) -> WasmResult<Result<String, types::IfaceError>> {
457 Ok(self.get_secret(&key).map_err(|err| {
458 tracing::warn!(secret = %key, error = %err, "secret lookup denied");
459 types::IfaceError::Denied
460 }))
461 }
462
463 fn telemetry_emit(
464 &mut self,
465 span_json: String,
466 _ctx: Option<types::TenantCtx>,
467 ) -> WasmResult<()> {
468 if let Some(mock) = &self.mocks
469 && mock.telemetry_drain(&[("span_json", span_json.as_str())])
470 {
471 return Ok(());
472 }
473 tracing::info!(span = %span_json, "telemetry emit from pack");
474 Ok(())
475 }
476
477 fn http_fetch(
478 &mut self,
479 req: host_import_v0_6::http::HttpRequest,
480 _ctx: Option<types::TenantCtx>,
481 ) -> WasmResult<Result<host_import_v0_6::http::HttpResponse, types::IfaceError>> {
482 let legacy_req = LegacyHttpRequest {
483 method: req.method,
484 url: req.url,
485 headers_json: req.headers_json,
486 body: req.body,
487 };
488 match host_import_v0_2::HostImports::http_fetch(self, legacy_req, None)? {
489 Ok(resp) => Ok(Ok(host_import_v0_6::http::HttpResponse {
490 status: resp.status,
491 headers_json: resp.headers_json,
492 body: resp.body,
493 })),
494 Err(err) => Ok(Err(map_legacy_error(err))),
495 }
496 }
497
498 fn mcp_exec(
499 &mut self,
500 component: String,
501 action: String,
502 args_json: String,
503 ctx: Option<types::TenantCtx>,
504 ) -> WasmResult<Result<String, types::IfaceError>> {
505 let _ = (component, action, args_json, ctx);
506 tracing::warn!("mcp.exec requested but MCP bridge is removed; returning unavailable");
507 Ok(Err(types::IfaceError::Unavailable))
508 }
509
510 fn state_get(
511 &mut self,
512 key: iface_types::StateKey,
513 ctx: Option<types::TenantCtx>,
514 ) -> WasmResult<Result<String, types::IfaceError>> {
515 let store = match self.state_store_handle() {
516 Ok(store) => store,
517 Err(err) => return Ok(Err(err)),
518 };
519 let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
520 Ok(ctx) => ctx,
521 Err(err) => {
522 tracing::warn!(error = %err, "invalid tenant context for state.get");
523 return Ok(Err(types::IfaceError::InvalidArg));
524 }
525 };
526 let key = StoreStateKey::from(key);
527 match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
528 Ok(Some(value)) => {
529 let result = if let Some(text) = value.as_str() {
530 text.to_string()
531 } else {
532 serde_json::to_string(&value).unwrap_or_else(|_| value.to_string())
533 };
534 Ok(Ok(result))
535 }
536 Ok(None) => Ok(Err(types::IfaceError::NotFound)),
537 Err(err) => {
538 tracing::warn!(error = %err, "state.get failed");
539 Ok(Err(types::IfaceError::Internal))
540 }
541 }
542 }
543
544 fn state_set(
545 &mut self,
546 key: iface_types::StateKey,
547 value_json: String,
548 ctx: Option<types::TenantCtx>,
549 ) -> WasmResult<Result<state::OpAck, types::IfaceError>> {
550 let store = match self.state_store_handle() {
551 Ok(store) => store,
552 Err(err) => return Ok(Err(err)),
553 };
554 let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
555 Ok(ctx) => ctx,
556 Err(err) => {
557 tracing::warn!(error = %err, "invalid tenant context for state.set");
558 return Ok(Err(types::IfaceError::InvalidArg));
559 }
560 };
561 let key = StoreStateKey::from(key);
562 let value = serde_json::from_str(&value_json).unwrap_or(Value::String(value_json));
563 match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
564 Ok(()) => Ok(Ok(state::OpAck::Ok)),
565 Err(err) => {
566 tracing::warn!(error = %err, "state.set failed");
567 Ok(Err(types::IfaceError::Internal))
568 }
569 }
570 }
571
572 fn session_update(
573 &mut self,
574 cursor: iface_types::SessionCursor,
575 ctx: Option<types::TenantCtx>,
576 ) -> WasmResult<Result<String, types::IfaceError>> {
577 let store = match self.session_store_handle() {
578 Ok(store) => store,
579 Err(err) => return Ok(Err(err)),
580 };
581 let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
582 Ok(ctx) => ctx,
583 Err(err) => {
584 tracing::warn!(error = %err, "invalid tenant context for session.update");
585 return Ok(Err(types::IfaceError::InvalidArg));
586 }
587 };
588 let user = match Self::ensure_user(&tenant_ctx) {
589 Ok(user) => user,
590 Err(err) => return Ok(Err(err)),
591 };
592 let flow_id = match Self::ensure_flow(&tenant_ctx) {
593 Ok(flow) => flow,
594 Err(err) => return Ok(Err(err)),
595 };
596 let cursor = Self::cursor_from_iface(cursor);
597 let payload = SessionData {
598 tenant_ctx: tenant_ctx.clone(),
599 flow_id,
600 cursor: cursor.clone(),
601 context_json: serde_json::json!({
602 "node_pointer": cursor.node_pointer,
603 "wait_reason": cursor.wait_reason,
604 "outbox_marker": cursor.outbox_marker,
605 })
606 .to_string(),
607 };
608 if let Some(existing) = tenant_ctx.session_id() {
609 let key = StoreSessionKey::from(existing.to_string());
610 if let Err(err) = store.update_session(&key, payload) {
611 tracing::error!(error = %err, "failed to update session snapshot");
612 return Ok(Err(types::IfaceError::Internal));
613 }
614 return Ok(Ok(existing.to_string()));
615 }
616 match store.find_by_user(&tenant_ctx, &user) {
617 Ok(Some((key, _))) => {
618 if let Err(err) = store.update_session(&key, payload) {
619 tracing::error!(error = %err, "failed to update existing user session");
620 return Ok(Err(types::IfaceError::Internal));
621 }
622 return Ok(Ok(key.to_string()));
623 }
624 Ok(None) => {}
625 Err(err) => {
626 tracing::error!(error = %err, "session lookup failed");
627 return Ok(Err(types::IfaceError::Internal));
628 }
629 }
630 let key = match store.create_session(&tenant_ctx, payload.clone()) {
631 Ok(key) => key,
632 Err(err) => {
633 tracing::error!(error = %err, "failed to create session");
634 return Ok(Err(types::IfaceError::Internal));
635 }
636 };
637 let ctx_with_session = tenant_ctx.with_session(key.to_string());
638 let updated_payload = SessionData {
639 tenant_ctx: ctx_with_session.clone(),
640 ..payload
641 };
642 if let Err(err) = store.update_session(&key, updated_payload) {
643 tracing::warn!(error = %err, "failed to stamp session id after create");
644 }
645 Ok(Ok(key.to_string()))
646 }
647}
648
649impl host_import_v0_2::HostImports for HostState {
650 fn secrets_get(
651 &mut self,
652 key: String,
653 _ctx: Option<LegacyTenantCtx>,
654 ) -> WasmResult<Result<String, LegacyIfaceError>> {
655 Ok(self.get_secret(&key).map_err(|err| {
656 tracing::warn!(secret = %key, error = %err, "secret lookup denied");
657 LegacyIfaceError::Denied
658 }))
659 }
660
661 fn telemetry_emit(
662 &mut self,
663 span_json: String,
664 _ctx: Option<LegacyTenantCtx>,
665 ) -> WasmResult<()> {
666 if let Some(mock) = &self.mocks
667 && mock.telemetry_drain(&[("span_json", span_json.as_str())])
668 {
669 return Ok(());
670 }
671 tracing::info!(span = %span_json, "telemetry emit from pack");
672 Ok(())
673 }
674
675 fn tool_invoke(
676 &mut self,
677 tool: String,
678 action: String,
679 args_json: String,
680 ctx: Option<LegacyTenantCtx>,
681 ) -> WasmResult<Result<String, LegacyIfaceError>> {
682 let _ = (tool, action, args_json, ctx);
683 tracing::warn!("tool invoke requested but MCP bridge is removed; returning unavailable");
684 Ok(Err(LegacyIfaceError::Unavailable))
685 }
686
687 fn http_fetch(
688 &mut self,
689 req: LegacyHttpRequest,
690 _ctx: Option<LegacyTenantCtx>,
691 ) -> WasmResult<Result<LegacyHttpResponse, LegacyIfaceError>> {
692 if !self.config.http_enabled {
693 tracing::warn!(url = %req.url, "http fetch denied by policy");
694 return Ok(Err(LegacyIfaceError::Denied));
695 }
696
697 let mut mock_state = None;
698 let raw_body = req.body.clone();
699 if let Some(mock) = &self.mocks
700 && let Ok(meta) = HttpMockRequest::new(
701 &req.method,
702 &req.url,
703 raw_body.as_deref().map(|body| body.as_bytes()),
704 )
705 {
706 match mock.http_begin(&meta) {
707 HttpDecision::Mock(response) => {
708 let http = LegacyHttpResponse::from(&response);
709 return Ok(Ok(http));
710 }
711 HttpDecision::Deny(reason) => {
712 tracing::warn!(url = %req.url, reason = %reason, "http fetch blocked by mocks");
713 return Ok(Err(LegacyIfaceError::Denied));
714 }
715 HttpDecision::Passthrough { record } => {
716 mock_state = Some((meta, record));
717 }
718 }
719 }
720
721 let method = req.method.parse().unwrap_or(reqwest::Method::GET);
722 let mut builder = self.http_client.request(method, &req.url);
723
724 if let Some(headers_json) = req.headers_json.as_ref() {
725 match serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(headers_json) {
726 Ok(map) => {
727 for (key, value) in map {
728 if let Some(val) = value.as_str()
729 && let Ok(header) =
730 reqwest::header::HeaderName::from_bytes(key.as_bytes())
731 && let Ok(header_value) = reqwest::header::HeaderValue::from_str(val)
732 {
733 builder = builder.header(header, header_value);
734 }
735 }
736 }
737 Err(err) => {
738 tracing::warn!(error = %err, "failed to parse headers for http.fetch");
739 }
740 }
741 }
742
743 if let Some(body) = raw_body.clone() {
744 builder = builder.body(body);
745 }
746
747 let response = match builder.send() {
748 Ok(resp) => resp,
749 Err(err) => {
750 tracing::error!(url = %req.url, error = %err, "http fetch failed");
751 return Ok(Err(LegacyIfaceError::Unavailable));
752 }
753 };
754
755 let status = response.status().as_u16();
756 let headers_map = response
757 .headers()
758 .iter()
759 .map(|(k, v)| {
760 (
761 k.as_str().to_string(),
762 v.to_str().unwrap_or_default().to_string(),
763 )
764 })
765 .collect::<BTreeMap<_, _>>();
766 let headers_json = serde_json::to_string(&headers_map).ok();
767 let body = response.text().ok();
768
769 if let Some((meta, true)) = mock_state.take()
770 && let Some(mock) = &self.mocks
771 {
772 let recorded = HttpMockResponse::new(status, headers_map.clone(), body.clone());
773 mock.http_record(&meta, &recorded);
774 }
775
776 Ok(Ok(LegacyHttpResponse {
777 status,
778 headers_json,
779 body,
780 }))
781 }
782}
783
784impl PackRuntime {
785 #[allow(clippy::too_many_arguments)]
786 pub async fn load(
787 path: impl AsRef<Path>,
788 config: Arc<HostConfig>,
789 mocks: Option<Arc<MockLayer>>,
790 archive_source: Option<&Path>,
791 session_store: Option<DynSessionStore>,
792 state_store: Option<DynStateStore>,
793 wasi_policy: Arc<RunnerWasiPolicy>,
794 secrets: DynSecretsManager,
795 oauth_config: Option<OAuthBrokerConfig>,
796 verify_archive: bool,
797 ) -> Result<Self> {
798 let path = path.as_ref();
799 let (_pack_root, safe_path) = normalize_pack_path(path)?;
800 let is_component = safe_path
801 .extension()
802 .and_then(|ext| ext.to_str())
803 .map(|ext| ext.eq_ignore_ascii_case("wasm"))
804 .unwrap_or(false);
805 let archive_hint_path = if let Some(source) = archive_source {
806 let (_, normalized) = normalize_pack_path(source)?;
807 Some(normalized)
808 } else if is_component {
809 None
810 } else {
811 Some(safe_path.clone())
812 };
813 let archive_hint = archive_hint_path.as_deref();
814 if verify_archive {
815 let verify_target = archive_hint.unwrap_or(&safe_path);
816 verify::verify_pack(verify_target).await?;
817 tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
818 }
819 let engine = Engine::default();
820 let wasm_bytes = fs::read(&safe_path).await?;
821 let mut metadata = PackMetadata::from_wasm(&wasm_bytes)
822 .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
823 let mut manifest = None;
824 let flows = if let Some(archive_path) = archive_hint {
825 match open_pack(archive_path, SigningPolicy::DevOk) {
826 Ok(pack) => {
827 metadata = PackMetadata::from_manifest(&pack.manifest);
828 manifest = Some(pack.manifest);
829 None
830 }
831 Err(err) => {
832 warn!(error = ?err, pack = %archive_path.display(), "failed to parse pack manifest; falling back to legacy flow cache");
833 match PackFlows::from_archive(archive_path) {
834 Ok(cache) => {
835 metadata = cache.metadata.clone();
836 Some(cache)
837 }
838 Err(err) => {
839 warn!(
840 error = %err,
841 pack = %archive_path.display(),
842 "failed to parse pack flows via greentic-pack; falling back to component exports"
843 );
844 None
845 }
846 }
847 }
848 }
849 } else {
850 None
851 };
852 let components = if let Some(archive_path) = archive_hint {
853 match load_components_from_archive(&engine, archive_path) {
854 Ok(map) => map,
855 Err(err) => {
856 warn!(error = %err, pack = %archive_path.display(), "failed to load components from archive");
857 HashMap::new()
858 }
859 }
860 } else if is_component {
861 let name = safe_path
862 .file_stem()
863 .map(|s| s.to_string_lossy().to_string())
864 .unwrap_or_else(|| "component".to_string());
865 let component = Component::from_binary(&engine, &wasm_bytes)?;
866 let mut map = HashMap::new();
867 map.insert(
868 name.clone(),
869 PackComponent {
870 name,
871 version: metadata.version.clone(),
872 component,
873 },
874 );
875 map
876 } else {
877 HashMap::new()
878 };
879 let http_client = Arc::clone(&HTTP_CLIENT);
880 Ok(Self {
881 path: safe_path,
882 archive_path: archive_hint.map(Path::to_path_buf),
883 config,
884 engine,
885 metadata,
886 manifest,
887 mocks,
888 flows,
889 components,
890 http_client,
891 pre_cache: Mutex::new(HashMap::new()),
892 session_store,
893 state_store,
894 wasi_policy,
895 secrets,
896 oauth_config,
897 })
898 }
899
900 pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
901 if let Some(cache) = &self.flows {
902 return Ok(cache.descriptors.clone());
903 }
904 if let Some(manifest) = &self.manifest {
905 let descriptors = manifest
906 .flows
907 .iter()
908 .map(|flow| FlowDescriptor {
909 id: flow.id.clone(),
910 flow_type: flow.kind.clone(),
911 profile: manifest.meta.name.clone(),
912 version: manifest.meta.version.to_string(),
913 description: Some(flow.kind.clone()),
914 })
915 .collect();
916 return Ok(descriptors);
917 }
918 Ok(Vec::new())
919 }
920
921 #[allow(dead_code)]
922 pub async fn run_flow(
923 &self,
924 _flow_id: &str,
925 _input: serde_json::Value,
926 ) -> Result<serde_json::Value> {
927 Ok(serde_json::json!({}))
929 }
930
931 pub async fn invoke_component(
932 &self,
933 component_ref: &str,
934 ctx: ComponentExecCtx,
935 operation: &str,
936 _config_json: Option<String>,
937 input_json: String,
938 ) -> Result<Value> {
939 let pack_component = self
940 .components
941 .get(component_ref)
942 .with_context(|| format!("component '{component_ref}' not found in pack"))?;
943
944 let pre = if let Some(pre) = self.pre_cache.lock().get(component_ref).cloned() {
945 pre
946 } else {
947 let mut linker = Linker::new(&self.engine);
948 register_all(&mut linker, self.oauth_config.is_some())?;
949 add_component_control_to_linker(&mut linker)?;
950 let pre = ComponentPre::new(
951 linker
952 .instantiate_pre(&pack_component.component)
953 .map_err(|err| anyhow!(err))?,
954 )
955 .map_err(|err| anyhow!(err))?;
956 self.pre_cache
957 .lock()
958 .insert(component_ref.to_string(), pre.clone());
959 pre
960 };
961
962 let host_state = HostState::new(
963 Arc::clone(&self.config),
964 Arc::clone(&self.http_client),
965 self.mocks.clone(),
966 self.session_store.clone(),
967 self.state_store.clone(),
968 Arc::clone(&self.secrets),
969 self.oauth_config.clone(),
970 )?;
971 let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
972 let mut store = wasmtime::Store::new(&self.engine, store_state);
973 let bindings = pre
974 .instantiate_async(&mut store)
975 .await
976 .map_err(|err| anyhow!(err))?;
977 let node = bindings.greentic_component_node();
978
979 let result = node.call_invoke(&mut store, &ctx, operation, &input_json)?;
980
981 match result {
982 InvokeResult::Ok(body) => {
983 if body.is_empty() {
984 return Ok(Value::Null);
985 }
986 serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
987 }
988 InvokeResult::Err(NodeError {
989 code,
990 message,
991 retryable,
992 backoff_ms,
993 details,
994 }) => {
995 let mut obj = serde_json::Map::new();
996 obj.insert("ok".into(), Value::Bool(false));
997 let mut error = serde_json::Map::new();
998 error.insert("code".into(), Value::String(code));
999 error.insert("message".into(), Value::String(message));
1000 error.insert("retryable".into(), Value::Bool(retryable));
1001 if let Some(backoff) = backoff_ms {
1002 error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1003 }
1004 if let Some(details) = details {
1005 error.insert(
1006 "details".into(),
1007 serde_json::from_str(&details).unwrap_or(Value::String(details)),
1008 );
1009 }
1010 obj.insert("error".into(), Value::Object(error));
1011 Ok(Value::Object(obj))
1012 }
1013 }
1014 }
1015
1016 pub fn load_flow_ir(&self, flow_id: &str) -> Result<greentic_flow::ir::FlowIR> {
1017 if let Some(cache) = &self.flows {
1018 return cache
1019 .flows
1020 .get(flow_id)
1021 .cloned()
1022 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1023 }
1024 if let Some(manifest) = &self.manifest {
1025 let entry = manifest
1026 .flows
1027 .iter()
1028 .find(|f| f.id == flow_id)
1029 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1030 let archive_path = self.archive_path.as_ref().unwrap_or(&self.path);
1031 let mut archive = ZipArchive::new(File::open(archive_path)?)?;
1032 match load_flow_doc(&mut archive, entry).and_then(|doc| {
1033 greentic_flow::to_ir(doc.clone())
1034 .with_context(|| format!("failed to build IR for flow {}", doc.id))
1035 }) {
1036 Ok(ir) => return Ok(ir),
1037 Err(err) => {
1038 warn!(
1039 error = %err,
1040 flow_id = flow_id,
1041 "failed to parse flow from archive; falling back to stub IR"
1042 );
1043 return Ok(build_stub_flow_ir(&entry.id, &entry.kind));
1044 }
1045 }
1046 }
1047 bail!("flow '{flow_id}' not available (pack exports disabled)")
1048 }
1049
1050 pub fn metadata(&self) -> &PackMetadata {
1051 &self.metadata
1052 }
1053
1054 pub fn for_component_test(
1055 components: Vec<(String, PathBuf)>,
1056 flows: HashMap<String, FlowIR>,
1057 config: Arc<HostConfig>,
1058 ) -> Result<Self> {
1059 let engine = Engine::default();
1060 let mut component_map = HashMap::new();
1061 for (name, path) in components {
1062 if !path.exists() {
1063 bail!("component artifact missing: {}", path.display());
1064 }
1065 let wasm_bytes = std::fs::read(&path)?;
1066 let component = Component::from_binary(&engine, &wasm_bytes)
1067 .with_context(|| format!("failed to compile component {}", path.display()))?;
1068 component_map.insert(
1069 name.clone(),
1070 PackComponent {
1071 name,
1072 version: "0.0.0".into(),
1073 component,
1074 },
1075 );
1076 }
1077
1078 let descriptors = flows
1079 .iter()
1080 .map(|(id, ir)| FlowDescriptor {
1081 id: id.clone(),
1082 flow_type: ir.flow_type.clone(),
1083 profile: "test".into(),
1084 version: "0.0.0".into(),
1085 description: None,
1086 })
1087 .collect::<Vec<_>>();
1088 let flows_cache = PackFlows {
1089 descriptors: descriptors.clone(),
1090 flows,
1091 metadata: PackMetadata::fallback(Path::new("component-test")),
1092 };
1093
1094 Ok(Self {
1095 path: PathBuf::new(),
1096 archive_path: None,
1097 config,
1098 engine,
1099 metadata: PackMetadata::fallback(Path::new("component-test")),
1100 manifest: None,
1101 mocks: None,
1102 flows: Some(flows_cache),
1103 components: component_map,
1104 http_client: Arc::clone(&HTTP_CLIENT),
1105 pre_cache: Mutex::new(HashMap::new()),
1106 session_store: None,
1107 state_store: None,
1108 wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1109 secrets: crate::secrets::default_manager(),
1110 oauth_config: None,
1111 })
1112 }
1113}
1114
1115fn map_legacy_error(err: LegacyIfaceError) -> types::IfaceError {
1116 match err {
1117 LegacyIfaceError::InvalidArg => types::IfaceError::InvalidArg,
1118 LegacyIfaceError::NotFound => types::IfaceError::NotFound,
1119 LegacyIfaceError::Denied => types::IfaceError::Denied,
1120 LegacyIfaceError::Unavailable => types::IfaceError::Unavailable,
1121 LegacyIfaceError::Internal => types::IfaceError::Internal,
1122 }
1123}
1124
1125struct PackFlows {
1126 descriptors: Vec<FlowDescriptor>,
1127 flows: HashMap<String, FlowIR>,
1128 metadata: PackMetadata,
1129}
1130
1131impl PackFlows {
1132 fn from_archive(path: &Path) -> Result<Self> {
1133 let pack = open_pack(path, SigningPolicy::DevOk)
1134 .map_err(|err| anyhow!("failed to open pack {}: {}", path.display(), err.message))?;
1135 let file = File::open(path)
1136 .with_context(|| format!("failed to open archive {}", path.display()))?;
1137 let mut archive = ZipArchive::new(file)
1138 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1139 let mut flows = HashMap::new();
1140 let mut descriptors = Vec::new();
1141 for flow in &pack.manifest.flows {
1142 match load_flow_entry(&mut archive, flow, &pack.manifest.meta) {
1143 Ok((descriptor, ir)) => {
1144 flows.insert(descriptor.id.clone(), ir);
1145 descriptors.push(descriptor);
1146 }
1147 Err(err) => {
1148 warn!(
1149 error = %err,
1150 flow_id = flow.id,
1151 "failed to parse flow metadata from archive; using stub flow"
1152 );
1153 let stub = build_stub_flow_ir(&flow.id, &flow.kind);
1154 flows.insert(flow.id.clone(), stub.clone());
1155 descriptors.push(FlowDescriptor {
1156 id: flow.id.clone(),
1157 flow_type: flow.kind.clone(),
1158 profile: pack.manifest.meta.name.clone(),
1159 version: pack.manifest.meta.version.to_string(),
1160 description: Some(flow.kind.clone()),
1161 });
1162 }
1163 }
1164 }
1165
1166 Ok(Self {
1167 metadata: PackMetadata::from_manifest(&pack.manifest),
1168 descriptors,
1169 flows,
1170 })
1171 }
1172}
1173
1174fn load_flow_entry(
1175 archive: &mut ZipArchive<File>,
1176 entry: &greentic_pack::builder::FlowEntry,
1177 meta: &greentic_pack::builder::PackMeta,
1178) -> Result<(FlowDescriptor, FlowIR)> {
1179 let doc = load_flow_doc(archive, entry)?;
1180 let ir = greentic_flow::to_ir(doc.clone())
1181 .with_context(|| format!("failed to build IR for flow {}", doc.id))?;
1182 let descriptor = FlowDescriptor {
1183 id: doc.id.clone(),
1184 flow_type: doc.flow_type.clone(),
1185 profile: meta.name.clone(),
1186 version: meta.version.to_string(),
1187 description: doc
1188 .description
1189 .clone()
1190 .or(doc.title.clone())
1191 .or_else(|| Some(entry.kind.clone())),
1192 };
1193 Ok((descriptor, ir))
1194}
1195
1196fn load_flow_doc(
1197 archive: &mut ZipArchive<File>,
1198 entry: &greentic_pack::builder::FlowEntry,
1199) -> Result<greentic_flow::model::FlowDoc> {
1200 if let Ok(bytes) = read_entry(archive, &entry.file_yaml)
1201 && let Ok(doc) = serde_yaml::from_slice(&bytes)
1202 {
1203 return Ok(doc);
1204 }
1205 let json_bytes = read_entry(archive, &entry.file_json)
1206 .with_context(|| format!("missing flow document {}", entry.file_json))?;
1207 let doc = serde_json::from_slice(&json_bytes)
1208 .with_context(|| format!("failed to parse {}", entry.file_json))?;
1209 Ok(doc)
1210}
1211
1212fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1213 let mut file = archive
1214 .by_name(name)
1215 .with_context(|| format!("entry {name} missing from archive"))?;
1216 let mut buf = Vec::new();
1217 file.read_to_end(&mut buf)?;
1218 Ok(buf)
1219}
1220
1221fn load_components_from_archive(
1222 engine: &Engine,
1223 path: &Path,
1224) -> Result<HashMap<String, PackComponent>> {
1225 let pack = open_pack(path, SigningPolicy::DevOk)
1226 .map_err(|err| anyhow!("failed to open pack {}: {}", path.display(), err.message))?;
1227 let mut archive = ZipArchive::new(File::open(path)?)?;
1228 let mut components = HashMap::new();
1229 for entry in &pack.manifest.components {
1230 let bytes = read_entry(&mut archive, &entry.file_wasm)
1231 .with_context(|| format!("missing component {}", entry.file_wasm))?;
1232 let component = Component::from_binary(engine, &bytes)
1233 .with_context(|| format!("failed to compile component {}", entry.name))?;
1234 components.insert(
1235 entry.name.clone(),
1236 PackComponent {
1237 name: entry.name.clone(),
1238 version: entry.version.to_string(),
1239 component,
1240 },
1241 );
1242 }
1243 Ok(components)
1244}
1245
1246#[derive(Clone, Debug, Default, Serialize, Deserialize)]
1247pub struct PackMetadata {
1248 pub pack_id: String,
1249 pub version: String,
1250 #[serde(default)]
1251 pub entry_flows: Vec<String>,
1252}
1253
1254impl PackMetadata {
1255 fn from_wasm(bytes: &[u8]) -> Option<Self> {
1256 let parser = Parser::new(0);
1257 for payload in parser.parse_all(bytes) {
1258 let payload = payload.ok()?;
1259 match payload {
1260 Payload::CustomSection(section) => {
1261 if section.name() == "greentic.manifest"
1262 && let Ok(meta) = Self::from_bytes(section.data())
1263 {
1264 return Some(meta);
1265 }
1266 }
1267 Payload::DataSection(reader) => {
1268 for segment in reader.into_iter().flatten() {
1269 if let Ok(meta) = Self::from_bytes(segment.data) {
1270 return Some(meta);
1271 }
1272 }
1273 }
1274 _ => {}
1275 }
1276 }
1277 None
1278 }
1279
1280 fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
1281 #[derive(Deserialize)]
1282 struct RawManifest {
1283 pack_id: String,
1284 version: String,
1285 #[serde(default)]
1286 entry_flows: Vec<String>,
1287 #[serde(default)]
1288 flows: Vec<RawFlow>,
1289 }
1290
1291 #[derive(Deserialize)]
1292 struct RawFlow {
1293 id: String,
1294 }
1295
1296 let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
1297 let mut entry_flows = if manifest.entry_flows.is_empty() {
1298 manifest.flows.iter().map(|f| f.id.clone()).collect()
1299 } else {
1300 manifest.entry_flows.clone()
1301 };
1302 entry_flows.retain(|id| !id.is_empty());
1303 Ok(Self {
1304 pack_id: manifest.pack_id,
1305 version: manifest.version,
1306 entry_flows,
1307 })
1308 }
1309
1310 pub fn fallback(path: &Path) -> Self {
1311 let pack_id = path
1312 .file_stem()
1313 .map(|s| s.to_string_lossy().into_owned())
1314 .unwrap_or_else(|| "unknown-pack".to_string());
1315 Self {
1316 pack_id,
1317 version: "0.0.0".to_string(),
1318 entry_flows: Vec::new(),
1319 }
1320 }
1321
1322 pub fn from_manifest(manifest: &greentic_pack::builder::PackManifest) -> Self {
1323 let entry_flows = if manifest.meta.entry_flows.is_empty() {
1324 manifest
1325 .flows
1326 .iter()
1327 .map(|flow| flow.id.clone())
1328 .collect::<Vec<_>>()
1329 } else {
1330 manifest.meta.entry_flows.clone()
1331 };
1332 Self {
1333 pack_id: manifest.meta.pack_id.clone(),
1334 version: manifest.meta.version.to_string(),
1335 entry_flows,
1336 }
1337 }
1338}
1339
1340fn build_stub_flow_ir(flow_id: &str, flow_type: &str) -> FlowIR {
1341 let mut nodes = IndexMap::new();
1342 nodes.insert(
1343 "complete".into(),
1344 NodeIR {
1345 component: "component.exec".into(),
1346 payload_expr: serde_json::json!({
1347 "component": "qa.process",
1348 "operation": "process",
1349 "input": {
1350 "status": "done",
1351 "flow_id": flow_id,
1352 }
1353 }),
1354 routes: vec![RouteIR {
1355 to: None,
1356 out: true,
1357 }],
1358 },
1359 );
1360 FlowIR {
1361 id: flow_id.to_string(),
1362 flow_type: flow_type.to_string(),
1363 start: Some("complete".into()),
1364 parameters: Value::Object(Default::default()),
1365 nodes,
1366 }
1367}
1368
1369impl From<&HttpMockResponse> for LegacyHttpResponse {
1370 fn from(value: &HttpMockResponse) -> Self {
1371 let headers_json = serde_json::to_string(&value.headers).ok();
1372 Self {
1373 status: value.status,
1374 headers_json,
1375 body: value.body.clone(),
1376 }
1377 }
1378}